Updated to match code changes to asterix
Added Procedure Langauge and Metadata
Restructured to fit with bom pom
Added ChannelJobService for execution tasks
Added string constants file
Added BAD Rewrite Rule Set
Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
new file mode 100644
index 0000000..a906ae6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+public interface BADConstants {
+ public static final String SubscriptionId = "subscriptionId";
+ public static final String BrokerName = "BrokerName";
+ public static final String ChannelName = "ChannelName";
+ public static final String ProcedureName = "ProcedureName";
+ public static final String DataverseName = "DataverseName";
+ public static final String BrokerEndPoint = "BrokerEndPoint";
+ public static final String DeliveryTime = "deliveryTime";
+ public static final String ResultId = "resultId";
+ public static final String ChannelExecutionTime = "channelExecutionTime";
+ public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+ public static final String ChannelResultsType = "ChannelResultsType";
+ public static final String ResultsDatasetName = "ResultsDatasetName";
+ public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+ public static final String CHANNEL_EXTENSION_NAME = "Channel";
+ public static final String PROCEDURE_KEYWORD = "Procedure";
+ public static final String BROKER_KEYWORD = "Broker";
+ public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
+ public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
+ public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
+ public static final String subscriptionEnding = "Subscriptions";
+ public static final String resultsEnding = "Results";
+ public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
+ public static final String BAD_DATAVERSE_NAME = "Metadata";
+ public static final String Duration = "Duration";
+ public static final String Function = "Function";
+ public static final String FIELD_NAME_ARITY = "Arity";
+ public static final String FIELD_NAME_PARAMS = "Params";
+ public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
+ public static final String FIELD_NAME_DEFINITION = "Definition";
+ public static final String FIELD_NAME_LANGUAGE = "Language";
+
+ public enum ChannelJobType {
+ REPETITIVE
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
new file mode 100644
index 0000000..da0c43b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants.ChannelJobType;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ChannelJobInfo extends ActiveJob {
+
+ private static final long serialVersionUID = 1L;
+ private List<String> locations;
+
+ public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state, JobSpecification spec) {
+ super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec);
+ }
+
+ public List<String> getLocations() {
+ return locations;
+
+ }
+
+ public void setLocations(List<String> locations) {
+ this.locations = locations;
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
new file mode 100644
index 0000000..d1df438
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for running channel jobs and communicating with Brokers
+ */
+public class ChannelJobService {
+
+ private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
+
+ public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+ IHyracksClientConnection hcc, long duration)
+ throws Exception {
+ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ executeJob(jobSpec, jobFlags, jobId, hcc);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
+ }
+ }
+ }, duration, duration, TimeUnit.MILLISECONDS);
+ return scheduledExecutorService;
+ }
+
+ public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+ IHyracksClientConnection hcc)
+ throws Exception {
+ LOGGER.info("Executing Channel Job");
+ if (jobId == null) {
+ hcc.startJob(jobSpec, jobFlags);
+ } else {
+ hcc.startJob(jobSpec, jobFlags, jobId);
+ }
+ }
+
+ public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+ JobId jobId = hcc.startJob(channeljobSpec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
+ AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
+ String formattedString;
+ formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+ sendMessage(brokerEndpoint, formattedString);
+ }
+
+ public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
+ String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
+ + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ + channelExecutionTime + "\", \"subscriptionIds\":[";
+ for (int i = 0; i < subscriptionIds.size(); i++) {
+ AUUID subId = (AUUID) subscriptionIds.getItem(i);
+ String subString = subId.toSimpleString();
+ JSON += "\"" + subString + "\"";
+ if (i < subscriptionIds.size() - 1) {
+ JSON += ",";
+ }
+ }
+ JSON += "]}";
+ return JSON;
+
+ }
+
+ public static long findPeriod(String duration) {
+ //TODO: Allow Repetitive Channels to use YMD durations
+ String hoursMinutesSeconds = "";
+ if (duration.indexOf('T') != -1) {
+ hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+ }
+ double seconds = 0;
+ if (hoursMinutesSeconds != "") {
+ int pos = 0;
+ if (hoursMinutesSeconds.indexOf('H') != -1) {
+ Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+ seconds += (hours * 60 * 60);
+ pos = hoursMinutesSeconds.indexOf('H') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('M') != -1) {
+ Double minutes =
+ Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+ seconds += (minutes * 60);
+ pos = hoursMinutesSeconds.indexOf('M') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('S') != -1) {
+ Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+ seconds += (s);
+ }
+ }
+ return (long) (seconds * 1000);
+ }
+
+ public static void sendMessage(String targetURL, String urlParameters) {
+ HttpURLConnection connection = null;
+ try {
+ //Create connection
+ URL url = new URL(targetURL);
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+ connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+ connection.setRequestProperty("Content-Language", "en-US");
+
+ connection.setUseCaches(false);
+ connection.setDoOutput(true);
+
+ if (connection.getOutputStream() != null) {
+ //Send message
+ DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+ wr.writeBytes(urlParameters);
+ wr.close();
+ } else {
+ LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ int responseCode = connection.getResponseCode();
+ LOGGER.info("\nSending 'POST' request to URL : " + url);
+ LOGGER.info("Post parameters : " + urlParameters);
+ LOGGER.info("Response Code : " + responseCode);
+ }
+
+ if (connection.getInputStream() != null) {
+ BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, response.toString());
+ }
+ } else {
+ LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
+ }
+
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ChannelJobService";
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
new file mode 100644
index 0000000..0a6ced2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
+import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory;
+import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.translator.AqlExpressionToPlanTranslatorFactory;
+
+public class BADCompilationProvider implements ILangCompilationProvider {
+
+ @Override
+ public IParserFactory getParserFactory() {
+ return new BADParserFactory();
+ }
+
+ @Override
+ public IRewriterFactory getRewriterFactory() {
+ return new AQLRewriterFactory();
+ }
+
+ @Override
+ public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
+ return new AQLAstPrintVisitorFactory();
+ }
+
+ @Override
+ public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
+ return new AqlExpressionToPlanTranslatorFactory();
+ }
+
+ @Override
+ public IRuleSetFactory getRuleSetFactory() {
+ return new BADRuleSetFactory();
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
new file mode 100644
index 0000000..959600f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.BrokerSearchKey;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelSearchKey;
+import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
+import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.bad.metadata.ProcedureSearchKey;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class BADLangExtension implements ILangExtension {
+
+ public static final ExtensionId EXTENSION_ID = new ExtensionId(BADLangExtension.class.getSimpleName(), 0);
+
+ @Override
+ public ExtensionId getId() {
+ return EXTENSION_ID;
+ }
+
+ @Override
+ public void configure(List<Pair<String, String>> args) {
+ }
+
+ @Override
+ public ILangCompilationProvider getLangCompilationProvider(Language lang) {
+ switch (lang) {
+ case AQL:
+ return new BADCompilationProvider();
+ case SQLPP:
+ return new SqlppCompilationProvider();
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public ExtensionKind getExtensionKind() {
+ return ExtensionKind.LANG;
+ }
+
+
+ public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName)
+ throws AlgebricksException {
+ BrokerSearchKey brokerSearchKey = new BrokerSearchKey(dataverseName, brokerName);
+ List<Broker> brokers = MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+ if (brokers.isEmpty()) {
+ return null;
+ } else if (brokers.size() > 1) {
+ throw new AlgebricksException("Broker search key returned more than one broker");
+ } else {
+ return brokers.get(0);
+ }
+ }
+
+ public static Channel getChannel(MetadataTransactionContext mdTxnCtx, String dataverseName, String channelName)
+ throws AlgebricksException {
+ ChannelSearchKey channelSearchKey = new ChannelSearchKey(dataverseName, channelName);
+ List<Channel> channels = MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+ if (channels.isEmpty()) {
+ return null;
+ } else if (channels.size() > 1) {
+ throw new AlgebricksException("Channel search key returned more than one channel");
+ } else {
+ return channels.get(0);
+ }
+ }
+
+ public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String procedureName, String arity) throws AlgebricksException {
+ ProcedureSearchKey procedureSearchKey = new ProcedureSearchKey(dataverseName, procedureName, arity);
+ List<Procedure> procedures = MetadataManager.INSTANCE.getEntities(mdTxnCtx, procedureSearchKey);
+ if (procedures.isEmpty()) {
+ return null;
+ } else if (procedures.size() > 1) {
+ throw new AlgebricksException("Procedure search key returned more than one channel");
+ } else {
+ return procedures.get(0);
+ }
+ }
+
+ public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ throws AlgebricksException {
+ DataverseBrokersSearchKey brokerSearchKey = new DataverseBrokersSearchKey(dataverseName);
+ return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+ }
+
+ public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ throws AlgebricksException {
+ DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName);
+ return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
new file mode 100644
index 0000000..58bca17
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.io.Reader;
+
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+
+public class BADParserFactory implements IParserFactory {
+
+ @Override
+ public IParser createParser(String query) {
+ return new BADAQLParser(query);
+ }
+
+ @Override
+ public IParser createParser(Reader reader) {
+ return new BADAQLParser(reader);
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
new file mode 100644
index 0000000..20519dd
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class BADQueryTranslatorExtension implements IStatementExecutorExtension {
+
+ public static final ExtensionId BAD_QUERY_TRANSLATOR_EXTENSION_ID = new ExtensionId(
+ BADQueryTranslatorExtension.class.getSimpleName(), 0);
+
+ private static class LazyHolder {
+ private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory();
+
+ }
+
+ @Override
+ public ExtensionId getId() {
+ return BAD_QUERY_TRANSLATOR_EXTENSION_ID;
+ }
+
+ @Override
+ public void configure(List<Pair<String, String>> args) {
+ }
+
+ @Override
+ public IStatementExecutorFactory getQueryTranslatorFactory() {
+ return LazyHolder.INSTANCE;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
new file mode 100644
index 0000000..958b14f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.SessionConfig;
+
+public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
+
+ @Override
+ public QueryTranslator create(List<Statement> statements, SessionConfig conf,
+ ILangCompilationProvider compilationProvider) {
+ return new BADStatementExecutor(statements, conf, compilationProvider);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
new file mode 100644
index 0000000..31d8cd0
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
+import org.apache.asterix.optimizer.base.RuleCollections;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class BADRuleSetFactory implements IRuleSetFactory {
+
+ @Override
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
+ throws AlgebricksException {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+ if (logicalRuleSet.size() != 14) {
+ throw new AlgebricksException("Incorrect RuleSet");
+ }
+ List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
+
+ for (int i = 0; i < normalizationCollection.size(); i++) {
+ IAlgebraicRewriteRule rule = normalizationCollection.get(i);
+ if (rule instanceof UnnestToDataScanRule) {
+ normalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
+ break;
+ }
+ }
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ logicalRuleSet.set(3, new Pair<>(seqOnceCtrl, normalizationCollection));
+ logicalRuleSet.set(7, new Pair<>(seqOnceCtrl, normalizationCollection));
+ return logicalRuleSet;
+ }
+
+ @Override
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
+ return DefaultRuleSetFactory.buildPhysical();
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
new file mode 100644
index 0000000..fa18867
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDropStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+public class BADStatementExecutor extends QueryTranslator {
+
+ public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
+ ILangCompilationProvider compliationProvider) {
+ super(aqlStatements, conf, compliationProvider);
+ }
+
+
+ @Override
+ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ //TODO: Remove this when metadata dependencies are in place
+ //TODO: Stop dataset drop when dataset used by channel
+ super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
+ List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+ for (Broker broker : brokers) {
+ BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
+ drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ }
+ List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
+ for (Channel channel : channels) {
+ ChannelDropStatement drop = new ChannelDropStatement(dvId,
+ new Identifier(channel.getChannelId().getEntityName()), false);
+ drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
new file mode 100644
index 0000000..7894c44
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BrokerDropStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier brokerName;
+ private boolean ifExists;
+
+ public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) {
+ this.brokerName = brokerName;
+ this.dataverseName = dataverseName;
+ this.ifExists = ifExists;
+ }
+
+ public boolean getIfExists() {
+ return ifExists;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getBrokerName() {
+ return brokerName;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+ //TODO: dont drop a broker that's being used
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+ if (broker == null) {
+ throw new AlgebricksException("A broker with this name " + brokerName + " doesn't exist.");
+ }
+ MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, broker);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
new file mode 100644
index 0000000..6811ef2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.common.statement.DropDatasetStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelDropStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private boolean ifExists;
+
+ public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) {
+ this.dataverseName = dataverseName;
+ this.channelName = channelName;
+ this.ifExists = ifExists;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public boolean getIfExists() {
+ return ifExists;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ boolean txnActive = false;
+ EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+ ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+ .getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Channel channel = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ txnActive = true;
+ channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+ txnActive = false;
+ if (channel == null) {
+ if (ifExists) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("There is no channel with this name " + channelName + ".");
+ }
+ }
+ if (listener != null) {
+ subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+ }
+ if (!subscriberRegistered) {
+ throw new AsterixException("Channel " + channelName + " is not running");
+ }
+
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+ .getMessageBroker();
+
+ ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
+ Set<String> ncs = new HashSet<>(cInfo.getLocations());
+ AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
+ ncs.toArray(new String[ncs.size()]));
+ int partition = 0;
+ for (String location : locations.getLocations()) {
+ messageBroker.sendApplicationMessageToNC(
+ new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
+ new ActiveRuntimeId(channel.getChannelId(),
+ RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
+ location);
+ }
+ eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
+
+ //Drop the Channel Datasets
+ //TODO: Need to find some way to handle if this fails.
+ //TODO: Prevent datasets for Channels from being dropped elsewhere
+ DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+ new Identifier(channel.getResultsDatasetName()), true);
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+ dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+ new Identifier(channel.getSubscriptionsDataset()), true);
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+ if (subscriberRegistered) {
+ listener.deregisterEventSubscriber(eventSubscriber);
+ }
+
+ //Remove the Channel Metadata
+ MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (txnActive) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
new file mode 100644
index 0000000..dc10742
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
+import org.apache.asterix.lang.common.expression.FieldBinding;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelSubscribeStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final Identifier brokerDataverseName;
+ private final Identifier brokerName;
+ private final List<Expression> argList;
+ private final String subscriptionId;
+ private final int varCounter;
+
+ public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
+ int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.brokerDataverseName = brokerDataverseName;
+ this.brokerName = brokerName;
+ this.argList = argList;
+ this.subscriptionId = subscriptionId;
+ this.varCounter = varCounter;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getBrokerDataverseName() {
+ return brokerDataverseName;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public Identifier getBrokerName() {
+ return brokerName;
+ }
+
+ public List<Expression> getArgList() {
+ return argList;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.QUERY;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ String brokerDataverse = ((QueryTranslator) statementExecutor)
+.getActiveDataverse(brokerDataverseName);
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+ if (channel == null) {
+ throw new AsterixException("There is no channel with this name " + channelName + ".");
+ }
+ Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverse, brokerName.getValue());
+ if (broker == null) {
+ throw new AsterixException("There is no broker with this name " + brokerName + ".");
+ }
+
+ String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+ if (argList.size() != channel.getFunction().getArity()) {
+ throw new AsterixException("Channel expected " + channel.getFunction().getArity()
+ + " parameters but got " + argList.size());
+ }
+
+ Query subscriptionTuple = new Query(false);
+
+ List<FieldBinding> fb = new ArrayList<FieldBinding>();
+ LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
+ Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
+ rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+
+ if (subscriptionId != null) {
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+
+ List<Expression> UUIDList = new ArrayList<Expression>();
+ UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+ FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+ FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+ function.getArity());
+ CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+ rightExpr = UUIDCall;
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+ }
+
+ for (int i = 0; i < argList.size(); i++) {
+ leftExpr = new LiteralExpr(new StringLiteral("param" + i));
+ rightExpr = argList.get(i);
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+ }
+ RecordConstructor recordCon = new RecordConstructor(fb);
+ subscriptionTuple.setBody(recordCon);
+
+ subscriptionTuple.setVarCounter(varCounter);
+
+ if (subscriptionId == null) {
+
+ VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+ VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+ VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
+ VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
+ useResultVar.setIsNewVar(false);
+ useSubscriptionVar.setIsNewVar(false);
+ Query returnQuery = new Query(false);
+ List<Clause> clauseList = new ArrayList<>();
+ LetClause let = new LetClause(subscriptionVar,
+ new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId)));
+ clauseList.add(let);
+ FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
+ returnQuery.setBody(body);
+
+ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+ metadataProvider.setResultAsyncMode(
+ resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+ InsertStatement insert = new InsertStatement(new Identifier(dataverse),
+ new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar,
+ returnQuery);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+ resultDelivery, stats, false);
+ } else {
+ UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
+ new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+ resultDelivery, stats, false);
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
new file mode 100644
index 0000000..17a54ec
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.OperatorExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelUnsubscribeStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final String subscriptionId;
+ private final int varCounter;
+ private VariableExpr vars;
+ private List<String> dataverses;
+ private List<String> datasets;
+
+ public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
+ String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+ this.vars = vars;
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.subscriptionId = subscriptionId;
+ this.varCounter = varCounter;
+ this.dataverses = dataverses;
+ this.datasets = datasets;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public VariableExpr getVariableExpr() {
+ return vars;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public String getsubScriptionId() {
+ return subscriptionId;
+ }
+
+ public List<String> getDataverses() {
+ return dataverses;
+ }
+
+ public List<String> getDatasets() {
+ return datasets;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.UPDATE;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+ if (channel == null) {
+ throw new AsterixException("There is no channel with this name " + channelName + ".");
+ }
+
+ String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+ //Need a condition to say subscription-id = sid
+ OperatorExpr condition = new OperatorExpr();
+ FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+ condition.addOperand(fa);
+ condition.setCurrentop(true);
+ condition.addOperator("=");
+
+ List<Expression> UUIDList = new ArrayList<Expression>();
+ UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+
+ FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+ FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+ function.getArity());
+ CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+ condition.addOperand(UUIDCall);
+
+ DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
+ new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
+ AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+ delete.accept(visitor, null);
+
+ ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
new file mode 100644
index 0000000..02389f1
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CreateBrokerStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
+ private final Identifier dataverseName;
+ private final Identifier brokerName;
+ private String endPointName;
+
+ public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
+ this.brokerName = brokerName;
+ this.dataverseName = dataverseName;
+ this.endPointName = endPointName;
+ }
+
+ public String getEndPointName() {
+ return endPointName;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getBrokerName() {
+ return brokerName;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+ if (broker != null) {
+ throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
+ }
+ broker = new Broker(dataverse, brokerName.getValue(), endPointName);
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a broker", e);
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
new file mode 100644
index 0000000..77de93e
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DatasetDecl;
+import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.util.JobUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class CreateChannelStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final FunctionSignature function;
+ private final CallExpr period;
+ private String duration;
+ private InsertStatement channelResultsInsertQuery;
+ private String subscriptionsTableName;
+ private String resultsTableName;
+ private boolean distributed;
+
+ public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
+ Expression period, boolean distributed) {
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.function = function;
+ this.period = (CallExpr) period;
+ this.duration = "";
+ this.distributed = distributed;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public String getResultsName() {
+ return resultsTableName;
+ }
+
+ public String getSubscriptionsName() {
+ return subscriptionsTableName;
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public FunctionSignature getFunction() {
+ return function;
+ }
+
+ public Expression getPeriod() {
+ return period;
+ }
+
+ public InsertStatement getChannelResultsInsertQuery() {
+ return channelResultsInsertQuery;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
+ throws MetadataException, HyracksDataException {
+ Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+ if (lookup == null) {
+ throw new MetadataException(" Unknown function " + function.getName());
+ }
+
+ if (!period.getFunctionSignature().getName().equals("duration")) {
+ throw new MetadataException(
+ "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+ }
+ duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+ IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+ this.resultsTableName = resultsTableName;
+ this.subscriptionsTableName = subscriptionsTableName;
+
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
+ String channelName, String duration, MetadataProvider metadataProvider, JobSpecification channeljobSpec,
+ String strIP, int port) throws Exception {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor channelQueryExecuter;
+ AlgebricksPartitionConstraint executerPc;
+
+ Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverse,
+ channelName, duration, channeljobSpec, strIP, port);
+ channelQueryExecuter = p.first;
+ executerPc = p.second;
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
+ spec.addRoot(channelQueryExecuter);
+ return new Pair<>(spec, p.second);
+
+ }
+
+ public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
+ JobSpecification jobSpec, String dataverse, String channelName, String duration,
+ JobSpecification channeljobSpec, String strIP, int port) throws Exception {
+ RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
+ channelName, duration, channeljobSpec, strIP, port);
+
+ String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
+ Set<String> ncs = new HashSet<>(Arrays.asList(partition));
+ AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ ncs.toArray(new String[ncs.size()]));
+ return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
+ }
+
+ private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+ Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
+
+ Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
+ Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
+ //Setup the subscriptions dataset
+ List<List<String>> partitionFields = new ArrayList<List<String>>();
+ List<Integer> keyIndicators = new ArrayList<Integer>();
+ keyIndicators.add(0);
+ List<String> fieldNames = new ArrayList<String>();
+ fieldNames.add(BADConstants.SubscriptionId);
+ partitionFields.add(fieldNames);
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+ DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName,
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, null,
+ new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+ //Setup the results dataset
+ partitionFields = new ArrayList<List<String>>();
+ fieldNames = new ArrayList<String>();
+ fieldNames.add(BADConstants.ResultId);
+ partitionFields.add(fieldNames);
+ idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+ DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
+ new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+ //Run both statements to create datasets
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ hcc);
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+
+ }
+
+ private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+ Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
+ builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
+
+ builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
+ builder.append(
+ "for $broker in dataset " + BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + "\n");
+ builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
+ builder.append("and $broker." + BADConstants.DataverseName + "= $sub." + BADConstants.DataverseName + "\n");
+ builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
+ int i = 0;
+ for (; i < function.getArity() - 1; i++) {
+ builder.append("$sub.param" + i + ",");
+ }
+ builder.append("$sub.param" + i + ")\n");
+ builder.append("return {\n");
+ builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
+ builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
+ builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
+ builder.append("\"result\":$result");
+ builder.append("}");
+ builder.append(")");
+ builder.append(" returning $a");
+ builder.append(";");
+ AQLParserFactory aqlFact = new AQLParserFactory();
+ List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+ hcc, hdc, ResultDelivery.ASYNC, stats, true);
+ }
+
+ private void setupCompiledJob(MetadataProvider metadataProvider, String dataverse, EntityId entityId,
+ JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+ ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
+ ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
+ String strIP = ccInfo.getClientNetAddress();
+ int port = ccInfo.getClientNetPort();
+ //Create Channel Operator
+ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(dataverse,
+ channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
+
+ ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, alteredJobSpec.first);
+ alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+ JobUtils.runJob(hcc, alteredJobSpec.first, false);
+ }
+
+ private void setupDistributedJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc)
+ throws Exception {
+ ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, channeljobSpec);
+ channeljobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+ JobId jobId = hcc.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB));
+ ChannelJobService.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB), jobId, hcc,
+ ChannelJobService.findPeriod(duration));
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+ //This function performs three tasks:
+ //1. Create datasets for the Channel
+ //2. Create and run the Channel Job
+ //3. Create the metadata entry for the channel
+
+ //TODO: Figure out how to handle when a subset of the 3 tasks fails
+ //TODO: The compiled job will break if anything changes on the function or two datasets
+ // Need to make sure we do proper checking when altering these things
+
+ String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+ Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
+ Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
+ EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+ ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+ .getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Channel channel = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+ if (channel != null) {
+ throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
+ }
+ if (listener != null) {
+ subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+ }
+ if (subscriberRegistered) {
+ throw new AsterixException("Channel " + channelName + " is already running");
+ }
+ initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+ channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+ duration);
+
+ //check if names are available before creating anything
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+
+ // Now we subscribe
+ if (listener == null) {
+ listener = new ChannelEventsListener(entityId);
+ ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ }
+ listener.registerEventSubscriber(eventSubscriber);
+ subscriberRegistered = true;
+
+ //Create Channel Datasets
+ createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+ dataverse);
+
+ //Create Channel Internal Job
+ JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+ metadataProvider, hcc, hdc, stats, dataverse);
+
+ if (distributed) {
+ setupDistributedJob(entityId, channeljobSpec, hcc);
+ } else {
+ setupCompiledJob(metadataProvider, dataverse, entityId, channeljobSpec, hcc);
+ }
+ eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a channel", e);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
new file mode 100644
index 0000000..3edc7dc
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class CreateProcedureStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateProcedureStatement.class.getName());
+
+ private final FunctionSignature signature;
+ private final String functionBody;
+ private final List<String> paramList;
+
+ public FunctionSignature getaAterixFunction() {
+ return signature;
+ }
+
+ public String getFunctionBody() {
+ return functionBody;
+ }
+
+ public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
+ String functionBody) {
+ this.signature = signature;
+ this.functionBody = functionBody;
+ this.paramList = new ArrayList<String>();
+ for (VarIdentifier varId : parameterList) {
+ this.paramList.add(varId.getValue());
+ }
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ public List<String> getParamList() {
+ return paramList;
+ }
+
+ public FunctionSignature getSignature() {
+ return signature;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ private JobSpecification createProcedureJob(String body, IStatementExecutor statementExecutor,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
+ throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append(body);
+ builder.append(";");
+ AQLParserFactory aqlFact = new AQLParserFactory();
+ List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+ if (fStatements.size() > 1) {
+ throw new Exception("Procedure can only execute a single statement");
+ }
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+ hcc, hdc, ResultDelivery.ASYNC, stats, true);
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+ String dataverse =
+ ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
+
+ EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+ ChannelEventsListener listener =
+ (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Procedure procedure = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+ Integer.toString(signature.getArity()));
+ if (procedure != null) {
+ throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
+ }
+ if (listener != null) {
+ subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+ }
+ if (subscriberRegistered) {
+ throw new AsterixException("Procedure " + signature.getName() + " is already running");
+ }
+
+ procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
+ Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL);
+
+ // Now we subscribe
+ if (listener == null) {
+ listener = new ChannelEventsListener(entityId);
+ ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ }
+ listener.registerEventSubscriber(eventSubscriber);
+ subscriberRegistered = true;
+
+
+ //Create Procedure Internal Job
+ JobSpecification channeljobSpec =
+ createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
+
+ // setupDistributedJob(entityId, channeljobSpec, hcc);
+
+ eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
new file mode 100644
index 0000000..7222b1a
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.rmi.RemoteException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.metadata.api.IMetadataIndex;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BADMetadataExtension implements IMetadataExtension {
+
+ public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
+ BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+ public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
+ NonTaggedDataFormat.class.getName(), IMetadataEntity.PENDING_NO_OP);
+
+ public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+ public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
+
+ public static final Datatype BAD_BROKER_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
+
+ public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
+
+ public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.RECORD_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
+
+ @Override
+ public ExtensionId getId() {
+ return BAD_METADATA_EXTENSION_ID;
+ }
+
+ @Override
+ public void configure(List<Pair<String, String>> args) {
+ // do nothing??
+ }
+
+ @Override
+ public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
+ return new MetadataTupleTranslatorProvider();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public List<ExtensionMetadataDataset> getExtensionIndexes() {
+ try {
+ return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @Override
+ public void initializeMetadata() throws HyracksDataException, RemoteException, ACIDException {
+ // enlist datasets
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.PROCEDURE_DATASET);
+ if (MetadataBootstrap.isNewUniverse()) {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ // add metadata datasets
+ MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
+ new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET,
+ BADMetadataIndexes.PROCEDURE_DATASET });
+ // insert default dataverse
+ // TODO prevent user from dropping this dataverse
+ // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
+ // insert default data type
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
+ // TODO prevent user from dropping these types
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ }
+ // local recovery?
+ // nothing for now
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
new file mode 100644
index 0000000..b33dcad
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.Arrays;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataIndexes {
+
+ public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.CHANNEL_EXTENSION_NAME);
+ public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL = new MetadataIndexImmutableProperties(
+ BADConstants.CHANNEL_EXTENSION_NAME,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
+
+ public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.BROKER_KEYWORD);
+ public static final MetadataIndexImmutableProperties PROPERTIES_BROKER = new MetadataIndexImmutableProperties(
+ BADConstants.BROKER_KEYWORD,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
+
+ public static final ExtensionMetadataDatasetId BAD_PROCEDURE_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.PROCEDURE_KEYWORD);
+ public static final MetadataIndexImmutableProperties PROPERTIES_PROCEDURE =
+ new MetadataIndexImmutableProperties(BADConstants.PROCEDURE_KEYWORD,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2);
+
+ public static final int NUM_FIELDS_CHANNEL_IDX = 3;
+ public static final int NUM_FIELDS_BROKER_IDX = 3;
+ public static final int NUM_FIELDS_PROCEDURE_IDX = 4;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
+ NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADConstants.ChannelName)),
+ 0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
+ new ChannelTupleTranslator(true));
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
+ NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADConstants.BrokerName)),
+ 0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
+ new BrokerTupleTranslator(true));
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset PROCEDURE_DATASET = new ExtensionMetadataDataset(PROPERTIES_PROCEDURE,
+ NUM_FIELDS_PROCEDURE_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADConstants.ProcedureName), Arrays.asList(BADConstants.FIELD_NAME_ARITY)),
+ 0, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, true, new int[] { 0, 1, 2 }, BAD_PROCEDURE_INDEX_ID,
+ new ProcedureTupleTranslator(true));
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
new file mode 100644
index 0000000..6ee5735
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataRecordTypes {
+
+ // -------------------------------------- Subscriptions --------------------------------------//
+ private static final String[] subTypeFieldNames = { BADConstants.DataverseName, BADConstants.BrokerName,
+ BADConstants.SubscriptionId };
+ private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+ public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType,
+ subTypeFieldNames, subTypeFieldTypes, true);
+
+ // ---------------------------------------- Results --------------------------------------------//
+ private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
+ BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+ private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID,
+ BuiltinType.ADATETIME };
+ public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType,
+ resultTypeFieldNames, resultTypeFieldTypes, true);
+
+ //------------------------------------------ Channel ----------------------------------------//
+ public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+ public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
+ public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+ public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
+ public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
+ public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
+ public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ BADConstants.RECORD_TYPENAME_CHANNEL,
+ // FieldNames
+ new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+ BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+ //------------------------------------------ Broker ----------------------------------------//
+ public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+ public static final int BROKER_NAME_FIELD_INDEX = 1;
+ public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
+ public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ BADConstants.RECORD_TYPENAME_BROKER,
+ // FieldNames
+ new String[] { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.BrokerEndPoint },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+
+ //----------------------------------------- Procedure ----------------------------------------//
+ public static final int PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX = 1;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX = 2;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX = 3;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6;
+ public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ BADConstants.RECORD_TYPENAME_PROCEDURE,
+ // FieldNames
+ new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
+ BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE,
+ BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
new file mode 100644
index 0000000..006f0dc
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a broker.
+ */
+public class Broker implements IExtensionMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String dataverseName;
+ private final String brokerName;
+ private final String endPointName;
+
+ public Broker(String dataverseName, String brokerName, String endPointName) {
+ this.endPointName = endPointName;
+ this.dataverseName = dataverseName;
+ this.brokerName = brokerName;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getEndPointName() {
+ return endPointName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Broker)) {
+ return false;
+ }
+ Broker otherDataset = (Broker) other;
+ if (!otherDataset.brokerName.equals(brokerName)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
new file mode 100644
index 0000000..b73e9e3
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class BrokerSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String broker;
+
+ public BrokerSearchKey(String dataverse, String broker) {
+ this.dataverse = dataverse;
+ this.broker = broker;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, broker);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
new file mode 100644
index 0000000..34397f4
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class BrokerTupleTranslator extends AbstractTupleTranslator<Broker> {
+ // Field indexes of serialized Broker in a tuple.
+ // Key field.
+ public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+ public static final int BROKER_NAME_FIELD_INDEX = 1;
+
+ // Payload field containing serialized broker.
+ public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes =
+ SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+ @SuppressWarnings("unchecked")
+ public BrokerTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.NUM_FIELDS_BROKER_IDX);
+ }
+
+ @Override
+ public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord channelRecord = recordSerDes.deserialize(in);
+ return createBrokerFromARecord(channelRecord);
+ }
+
+ private Broker createBrokerFromARecord(ARecord brokerRecord) {
+ Broker broker = null;
+ String dataverseName = ((AString) brokerRecord
+ .getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+ String brokerName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX))
+ .getStringValue();
+ String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
+ .getStringValue();
+
+ broker = new Broker(dataverseName, brokerName, endPointName);
+ return broker;
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Broker broker) throws IOException, MetadataException {
+ // write the key in the first fields of the tuple
+
+ tupleBuilder.reset();
+ aString.setValue(broker.getDataverseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ aString.setValue(broker.getBrokerName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ recordBuilder.reset(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(broker.getDataverseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(broker.getBrokerName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(broker.getEndPointName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
new file mode 100644
index 0000000..b201af6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a channel.
+ */
+public class Channel implements IExtensionMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ /** A unique identifier for the channel */
+ protected final EntityId channelId;
+ private final String subscriptionsDatasetName;
+ private final String resultsDatasetName;
+ private final String duration;
+ private final FunctionSignature function;
+
+ public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+ FunctionSignature function, String duration) {
+ this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+ this.function = function;
+ this.duration = duration;
+ this.resultsDatasetName = resultsDataset;
+ this.subscriptionsDatasetName = subscriptionsDataset;
+ }
+
+ public EntityId getChannelId() {
+ return channelId;
+ }
+
+ public String getSubscriptionsDataset() {
+ return subscriptionsDatasetName;
+ }
+
+ public String getResultsDatasetName() {
+ return resultsDatasetName;
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public FunctionSignature getFunction() {
+ return function;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Channel)) {
+ return false;
+ }
+ Channel otherDataset = (Channel) other;
+ if (!otherDataset.channelId.equals(channelId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
new file mode 100644
index 0000000..a3c757b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.log4j.Logger;
+
+public class ChannelEventsListener implements IActiveEntityEventsListener {
+ private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
+ private final List<IActiveLifecycleEventSubscriber> subscribers;
+ private final Map<Long, ActiveJob> jobs;
+ private final Map<EntityId, ChannelJobInfo> jobInfos;
+ private EntityId entityId;
+
+ public ChannelEventsListener(EntityId entityId) {
+ this.entityId = entityId;
+ subscribers = new ArrayList<>();
+ jobs = new HashMap<>();
+ jobInfos = new HashMap<>();
+ }
+
+ @Override
+ public void notify(ActiveEvent event) {
+ try {
+ switch (event.getEventKind()) {
+ case JOB_START:
+ handleJobStartEvent(event);
+ break;
+ case JOB_FINISH:
+ handleJobFinishEvent(event);
+ break;
+ case PARTITION_EVENT:
+ LOGGER.warn("Partition Channel Event");
+ break;
+ default:
+ break;
+
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unhandled Exception", e);
+ }
+ }
+
+ private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ handleJobStartMessage((ChannelJobInfo) jobInfo);
+ }
+
+ private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Channel Job finished for " + jobInfo);
+ }
+ handleJobFinishMessage((ChannelJobInfo) jobInfo);
+ }
+
+ private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
+ EntityId channelJobId = cInfo.getEntityId();
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ JobStatus status = info.getStatus();
+ boolean failure = status != null && status.equals(JobStatus.FAILURE);
+
+ jobInfos.remove(channelJobId);
+ jobs.remove(cInfo.getJobId().getId());
+ // notify event listeners
+ ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
+ : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
+ notifyEventSubscribers(event);
+ }
+
+ private void notifyEventSubscribers(ActiveLifecycleEvent event) {
+ if (subscribers != null && !subscribers.isEmpty()) {
+ for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+ subscriber.handleEvent(event);
+ }
+ }
+ }
+
+ private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
+ List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+ channelOperatorIds.add(opDesc.getOperatorId());
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ List<String> locations = new ArrayList<>();
+ for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ locations.add(operatorLocations.get(i));
+ }
+ }
+ cInfo.setLocations(locations);
+ cInfo.setState(ActivityState.ACTIVE);
+ }
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ try {
+ registerJob(jobId, spec);
+ return;
+
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ }
+
+ public synchronized void registerJob(JobId jobId, JobSpecification jobSpec) {
+ if (jobs.get(jobId.getId()) != null) {
+ throw new IllegalStateException("Channel job already registered");
+ }
+ if (jobInfos.containsKey(jobId.getId())) {
+ throw new IllegalStateException("Channel job already registered");
+ }
+
+ ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
+ jobs.put(jobId.getId(), cInfo);
+ jobInfos.put(entityId, cInfo);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
+ }
+
+ notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ }
+
+ public JobSpecification getJobSpecification(EntityId activeJobId) {
+ return jobInfos.get(activeJobId).getSpec();
+ }
+
+ public ChannelJobInfo getJobInfo(EntityId activeJobId) {
+ return jobInfos.get(activeJobId);
+ }
+
+ public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+ subscribers.add(subscriber);
+ }
+
+ public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+ subscribers.remove(subscriber);
+ }
+
+ public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
+ boolean active = false;
+ ChannelJobInfo cInfo = jobInfos.get(activeJobId);
+ if (cInfo != null) {
+ active = cInfo.getState().equals(ActivityState.ACTIVE);
+ }
+ if (active) {
+ registerEventSubscriber(eventSubscriber);
+ }
+ return active;
+ }
+
+ public FeedConnectionId[] getConnections() {
+ return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
+ }
+
+ @Override
+ public boolean isEntityActive() {
+ return !jobs.isEmpty();
+ }
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
+ if (entityId.getDataverse().equals(dataverseName)) {
+ String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
+ String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
+ if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
new file mode 100644
index 0000000..679548c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ChannelSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String channel;
+
+ public ChannelSearchKey(String dataverse, String channel) {
+ this.dataverse = dataverse;
+ this.channel = channel;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, channel);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
new file mode 100644
index 0000000..b9ae250
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
+ // Field indexes of serialized Feed in a tuple.
+ // Key field.
+ public static final int CHANNEL_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+ public static final int CHANNEL_NAME_FIELD_INDEX = 1;
+
+ // Payload field containing serialized feed.
+ public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+ @SuppressWarnings("unchecked")
+ public ChannelTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX);
+ }
+
+ @Override
+ public Channel getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord channelRecord = recordSerDes.deserialize(in);
+ return createChannelFromARecord(channelRecord);
+ }
+
+ private Channel createChannelFromARecord(ARecord channelRecord) {
+ Channel channel = null;
+ String dataverseName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+ String channelName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX)).getStringValue();
+ String subscriptionsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+ String resultsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX)).getStringValue();
+ String fName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getStringValue();
+ String duration = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX)).getStringValue();
+
+ FunctionSignature signature = null;
+
+ String[] qnameComponents = fName.split("\\.");
+ String functionDataverse;
+ String functionName;
+ if (qnameComponents.length == 2) {
+ functionDataverse = qnameComponents[0];
+ functionName = qnameComponents[1];
+ } else {
+ functionDataverse = dataverseName;
+ functionName = qnameComponents[0];
+ }
+
+ String[] nameComponents = functionName.split("@");
+ signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
+
+ channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration);
+ return channel;
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Channel channel) throws IOException, MetadataException {
+ // write the key in the first fields of the tuple
+
+ tupleBuilder.reset();
+ aString.setValue(channel.getChannelId().getDataverse());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ aString.setValue(channel.getChannelId().getEntityName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ recordBuilder.reset(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(channel.getChannelId().getDataverse());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(channel.getChannelId().getEntityName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(channel.getSubscriptionsDataset());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ fieldValue.reset();
+ aString.setValue(channel.getResultsDatasetName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ fieldValue.reset();
+ aString.setValue(channel.getFunction().toString());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
+
+ // write field 5
+ fieldValue.reset();
+ aString.setValue(channel.getDuration());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
new file mode 100644
index 0000000..527d65b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseBrokersSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
new file mode 100644
index 0000000..ffb3ab6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseChannelsSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
new file mode 100644
index 0000000..b64bf1b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+public class Procedure implements IExtensionMetadataEntity {
+ private static final long serialVersionUID = 1L;
+ public static final String LANGUAGE_AQL = "AQL";
+ public static final String LANGUAGE_JAVA = "JAVA";
+
+ public static final String RETURNTYPE_VOID = "VOID";
+ public static final String NOT_APPLICABLE = "N/A";
+
+ private final EntityId procedureId;
+ private final int arity;
+ private final List<String> params;
+ private final String body;
+ private final String returnType;
+ private final String language;
+
+ public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType,
+ String functionBody, String language) {
+ this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
+ this.params = params;
+ this.body = functionBody;
+ this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
+ this.language = language;
+ this.arity = arity;
+ }
+
+ public EntityId getEntityId() {
+ return procedureId;
+ }
+
+ public List<String> getParams() {
+ return params;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public String getReturnType() {
+ return returnType;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public int getArity() {
+ return arity;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Procedure)) {
+ return false;
+ }
+ Procedure otherDataset = (Procedure) other;
+ if (!otherDataset.procedureId.equals(procedureId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
new file mode 100644
index 0000000..6456170
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ProcedureSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String channel;
+ private final String arity;
+
+ public ProcedureSearchKey(String dataverse, String channel, String arity) {
+ this.dataverse = dataverse;
+ this.channel = channel;
+ this.arity = arity;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, channel, arity);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
new file mode 100644
index 0000000..f2eab9b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Procedure metadata entity to an ITupleReference and vice versa.
+ */
+public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> {
+ // Field indexes of serialized Procedure in a tuple.
+ // First key field.
+ public static final int PROCEDURE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+ // Second key field.
+ public static final int PROCEDURE_PROCEDURE_NAME_TUPLE_FIELD_INDEX = 1;
+ // Third key field.
+ public static final int PROCEDURE_ARITY_TUPLE_FIELD_INDEX = 2;
+
+ // Payload field containing serialized Procedure.
+ public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ protected ProcedureTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX);
+ }
+
+ @Override
+ public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord procedureRecord = recordSerDes.deserialize(in);
+ return createProcedureFromARecord(procedureRecord);
+ }
+
+ private Procedure createProcedureFromARecord(ARecord procedureRecord) {
+ String dataverseName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+ .getStringValue();
+ String procedureName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX))
+ .getStringValue();
+ String arity = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX)).getStringValue();
+
+ IACursor cursor = ((AOrderedList) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX)).getCursor();
+ List<String> params = new ArrayList<String>();
+ while (cursor.next()) {
+ params.add(((AString) cursor.get()).getStringValue());
+ }
+
+ String returnType = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX))
+ .getStringValue();
+
+ String definition = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX))
+ .getStringValue();
+
+ String language = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
+ .getStringValue();
+
+ return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
+ language);
+
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Procedure procedure) throws IOException, MetadataException {
+ // write the key in the first 2 fields of the tuple
+ tupleBuilder.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ // write the pay-load in the fourth field of the tuple
+
+ recordBuilder.reset(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ OrderedListBuilder listBuilder = new OrderedListBuilder();
+ ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+ listBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE
+ .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX]);
+ for (String param : procedure.getParams()) {
+ itemValue.reset();
+ aString.setValue(param);
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ fieldValue.reset();
+ aString.setValue(procedure.getReturnType());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue);
+
+ // write field 5
+ fieldValue.reset();
+ aString.setValue(procedure.getBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX, fieldValue);
+
+ // write field 6
+ fieldValue.reset();
+ aString.setValue(procedure.getLanguage());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
new file mode 100644
index 0000000..89f0d20
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
+import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
+ return false;
+ }
+ AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+ return false;
+ }
+ DelegateOperator eOp = (DelegateOperator) op;
+ if (!(eOp.getDelegate() instanceof CommitOperator)) {
+ return false;
+ }
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+ return false;
+ }
+ InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+ if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+ return false;
+ }
+ DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+ String datasetName = dds.getDataset().getDatasetName();
+ if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+ || !datasetName.endsWith("Results")) {
+ return false;
+ }
+ String channelDataverse = dds.getDataset().getDataverseName();
+ //Now we know that we are inserting into results
+
+ String channelName = datasetName.substring(0, datasetName.length() - 7);
+ String subscriptionsName = channelName + "Subscriptions";
+ //TODO: Can we check here to see if there is a channel with such a name?
+
+ DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+ if (subscriptionsScan == null) {
+ return false;
+ }
+
+ //Now we want to make sure and set the commit to be a nonsink commit
+ ((CommitOperator) eOp.getDelegate()).setSink(false);
+
+ //Now we need to get the broker EndPoint
+ LogicalVariable brokerEndpointVar = context.newVar();
+ AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+ AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+ //now brokerNameVar holds the brokerName for use farther up in the plan
+
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+ context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+ context.computeAndSetTypeEnvironmentForOperator(eOp);
+
+ //get subscriptionIdVar
+ LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+
+ //The channelExecutionTime is created just before the scan
+ LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
+ .getVariables().get(0);
+
+ ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+ badProject.getVariables().add(subscriptionIdVar);
+ badProject.getVariables().add(brokerEndpointVar);
+ badProject.getVariables().add(channelExecutionVar);
+ context.computeAndSetTypeEnvironmentForOperator(badProject);
+
+ //Create my brokerNotify plan above the extension Operator
+ DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
+ context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+
+ opRef.setValue(dOp);
+
+ return true;
+ }
+
+ private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+ LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+ ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ throws AlgebricksException {
+ //create the Distinct Op
+ ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+ VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
+ expressions.add(new MutableObject<ILogicalExpression>(vExpr));
+ DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+ //create the GroupBy Op
+ //And set the distinct as input
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
+
+ //create group by operator
+ GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+ groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
+
+ //create nested plan for subscription ids in group by
+ NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
+ new MutableObject<ILogicalOperator>(groupbyOp));
+ //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
+ //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
+ LogicalVariable subscriptionListVar = context.newVar();
+ List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
+ aggVars.add(subscriptionListVar);
+ AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
+ AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
+ funAgg.getArguments()
+ .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
+ List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
+ AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+ listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
+
+ //add nested plans
+ nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
+
+ //Create the NotifyBrokerOperator
+ NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
+ channelExecutionVar);
+ EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
+ NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
+ notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
+ DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
+ extensionOp.setPhysicalOperator(notifyBrokerPOp);
+ extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+
+ //Set the input for the brokerNotify as the replicate operator
+ distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+
+ //compute environment bottom up
+
+ context.computeAndSetTypeEnvironmentForOperator(distinctOp);
+ context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+ context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
+ context.computeAndSetTypeEnvironmentForOperator(listifyOp);
+ context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+ return extensionOp;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
+ AbstractLogicalOperator opAboveBrokersScan) {
+ Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+ new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
+ DataSourceScanOperator brokerScan = null;
+ int index = 0;
+ for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ brokerScan = (DataSourceScanOperator) subOp.getValue();
+ break;
+ }
+ index++;
+ }
+ Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
+ new VariableReferenceExpression(brokerScan.getVariables().get(2)));
+
+ ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
+ ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
+ varArray.add(brokerEndpointVar);
+ ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
+ exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
+
+ AssignOperator assignOp = new AssignOperator(varArray, exprArray);
+
+ //Place assignOp between the scan and the op above it
+ assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
+ opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
+
+ return assignOp;
+ }
+
+ /*This function searches for the needed op
+ * If lookingForBrokers, find the op above the brokers scan
+ * Else find the suscbriptionsScan
+ */
+ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+ if (!op.hasInputs()) {
+ return null;
+ }
+ for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
+ if (lookingForString.equals("brokers")) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ return op;
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+
+ } else if (lookingForString.equals("project")) {
+ if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+ }
+
+ else {
+ if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+
+ }
+ }
+ return null;
+ }
+
+ private boolean isBrokerScan(AbstractLogicalOperator op) {
+ if (op instanceof DataSourceScanOperator) {
+ if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+ DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+ if (dds.getDataset().getDataverseName().equals("Metadata")
+ && dds.getDataset().getDatasetName().equals("Broker")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+ if (op instanceof DataSourceScanOperator) {
+ if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+ DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+ if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+ if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
new file mode 100644
index 0000000..d281b49
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Collection;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
+ private final LogicalVariable subscriptionIdVar;
+ private final LogicalVariable brokerEndpointVar;
+ private final LogicalVariable channelExecutionVar;
+
+ public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
+ LogicalVariable resultSetVar) {
+ this.brokerEndpointVar = brokerEndpointVar;
+ this.subscriptionIdVar = subscriptionIdVar;
+ this.channelExecutionVar = resultSetVar;
+ }
+
+ public LogicalVariable getSubscriptionVariable() {
+ return subscriptionIdVar;
+ }
+
+ public LogicalVariable getBrokerEndpointVariable() {
+ return brokerEndpointVar;
+ }
+
+ public LogicalVariable getChannelExecutionVariable() {
+ return channelExecutionVar;
+ }
+
+ @Override
+ public String toString() {
+ return "notify-brokers";
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public IOperatorDelegate newInstance() {
+ return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+ usedVars.add(subscriptionIdVar);
+ usedVars.add(brokerEndpointVar);
+ usedVars.add(channelExecutionVar);
+ }
+
+ @Override
+ public void getProducedVariables(Collection<LogicalVariable> producedVars) {
+ // none produced
+
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
new file mode 100644
index 0000000..12d5ae2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
+
+ private final EntityId entityId;
+
+ public NotifyBrokerPOperator(EntityId entityId) {
+ this.entityId = entityId;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.DELEGATE_OPERATOR;
+ }
+
+ @Override
+ public String toString() {
+ return "NOTIFY_BROKERS";
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ DelegateOperator notify = (DelegateOperator) op;
+ LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+ LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
+ LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+
+ int brokerColumn = inputSchemas[0].findVariable(brokerVar);
+ int subColumn = inputSchemas[0].findVariable(subVar);
+ int executionColumn = inputSchemas[0].findVariable(executionVar);
+
+ IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
+ IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+ IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+
+ NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
+ channelExecutionEvalFactory, entityId);
+
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
+
+ builder.contributeMicroOperator(op, runtime, recDesc);
+
+ // and contribute one edge from its child
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, notify, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
new file mode 100644
index 0000000..8634e4c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream di = new DataInputStream(bbis);
+ private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
+ new AOrderedListType(BuiltinType.AUUID, null));
+
+ private IPointable inputArg0 = new VoidPointable();
+ private IPointable inputArg1 = new VoidPointable();
+ private IPointable inputArg2 = new VoidPointable();
+ private IScalarEvaluator eval0;
+ private IScalarEvaluator eval1;
+ private IScalarEvaluator eval2;
+ private final ActiveManager activeManager;
+ private final EntityId entityId;
+
+ public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
+ IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+ EntityId activeJobId) throws HyracksDataException {
+ this.tRef = new FrameTupleReference();
+ eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
+ eval1 = subEvalFactory.createScalarEvaluator(ctx);
+ eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
+ this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getActiveManager();
+ this.entityId = activeJobId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+
+ eval0.evaluate(tRef, inputArg0);
+ eval1.evaluate(tRef, inputArg1);
+ eval2.evaluate(tRef, inputArg2);
+
+ int serBrokerOffset = inputArg0.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+ AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
+
+ int serSubOffset = inputArg1.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+ AOrderedList subs = subSerDes.deserialize(di);
+
+ int resultSetOffset = inputArg2.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+ ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+ String executionTimeString;
+ try {
+ executionTimeString = executionTime.toSimpleString();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+ executionTimeString);
+
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
new file mode 100644
index 0000000..d5d05cf
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IScalarEvaluatorFactory brokerEvalFactory;
+ private final IScalarEvaluatorFactory subEvalFactory;
+ private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+ private final EntityId entityId;
+
+ public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
+ IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+ this.brokerEvalFactory = brokerEvalFactory;
+ this.subEvalFactory = subEvalFactory;
+ this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+ this.entityId = entityId;
+ }
+
+ @Override
+ public String toString() {
+ return "notify-broker";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
new file mode 100644
index 0000000..8093977
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
+
+ /** The unique identifier of the job. **/
+ protected final EntityId entityId;
+
+ protected final JobSpecification jobSpec;
+
+ private final String duration;
+
+ private String strIP;
+ private int port;
+
+ public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
+ String duration, JobSpecification channeljobSpec, String strIP, int port) {
+ super(spec, 0, 0);
+ this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+ this.jobSpec = channeljobSpec;
+ this.duration = duration;
+ this.strIP = strIP;
+ this.port = port;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
+ RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
+ return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ public JobSpecification getJobSpec() {
+ return jobSpec;
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..1bbe331
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private final JobSpecification jobSpec;
+ private long duration;
+ private final HyracksConnection hcc;
+
+ public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+ JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException {
+ super(ctx, runtimeId);
+ this.jobSpec = channeljobSpec;
+ this.duration = ChannelJobService.findPeriod(duration);
+ try {
+ hcc = new HyracksConnection(strIP, port);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+
+ @Override
+ protected void start() throws HyracksDataException, InterruptedException {
+ try {
+ scheduledExecutorService =
+ ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ while (!scheduledExecutorService.isTerminated()) {
+
+ }
+
+ }
+
+ @Override
+ protected void abort() throws HyracksDataException, InterruptedException {
+ scheduledExecutorService.shutdown();
+ }
+
+}
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
new file mode 100644
index 0000000..94b4c78
--- /dev/null
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -0,0 +1,206 @@
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
+import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
+import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
+import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
+
+
+@merge
+Statement SingleStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | stmt = ChannelSubscriptionStatement())
+ {
+ // merge area 3
+ }
+}
+
+@merge
+Statement CreateStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | stmt = ChannelSpecification() | stmt = BrokerSpecification() | stmt = ProcedureSpecification())
+ {
+ // merge area 3
+ }
+}
+
+@merge
+Statement DropStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | "channel" pairId = QualifiedName() ifExists = IfExists()
+ {
+ stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
+ }
+ | "broker" pairId = QualifiedName() ifExists = IfExists()
+ {
+ stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
+ }
+ )
+ {
+ // merge area 3
+ }
+}
+
+@new
+CreateChannelStatement ChannelSpecification() throws ParseException:
+{
+ Pair<Identifier,Identifier> nameComponents = null;
+ FunctionSignature appliedFunction = null;
+ CreateChannelStatement ccs = null;
+ String fqFunctionName = null;
+ Expression period = null;
+ boolean distributed = false;
+}
+{
+ (
+ "repetitive" "channel" nameComponents = QualifiedName()
+ <USING> appliedFunction = FunctionSignature()
+ "period" period = FunctionCallExpr() ("distributed" { distributed = true; })?
+ {
+ ccs = new CreateChannelStatement(nameComponents.first,
+ nameComponents.second, appliedFunction, period, distributed);
+ }
+ )
+ {
+ return ccs;
+ }
+}
+
+
+@new
+CreateProcedureStatement ProcedureSpecification() throws ParseException:
+{
+ Pair<Identifier,Identifier> nameComponents = null;
+ FunctionSignature signature;
+ List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+ String functionBody;
+ Token beginPos;
+ Token endPos;
+ Expression functionBodyExpr;
+}
+{
+ "procedure" nameComponents = QualifiedName()
+ paramList = ParameterList()
+ <LEFTBRACE>
+ {
+ beginPos = token;
+ }
+ functionBodyExpr = Expression() <RIGHTBRACE>
+ {
+ endPos = token;
+ functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+ signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size());
+ removeCurrentScope();
+ return new CreateProcedureStatement(signature, paramList, functionBody);
+ }
+}
+
+
+
+
+@new
+CreateBrokerStatement BrokerSpecification() throws ParseException:
+{
+ CreateBrokerStatement cbs = null;
+ Pair<Identifier,Identifier> name = null;
+ String endPoint = null;
+}
+{
+ (
+ "broker" name = QualifiedName()
+ <AT> endPoint = StringLiteral()
+ {
+ cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+ }
+ )
+ {
+ return cbs;
+ }
+}
+
+@new
+Statement ChannelSubscriptionStatement() throws ParseException:
+{
+ Statement stmt = null;
+ Pair<Identifier,Identifier> nameComponents = null;
+ List<Expression> argList = new ArrayList<Expression>();
+ Expression tmp = null;
+ String id = null;
+ String subscriptionId = null;
+ Pair<Identifier,Identifier> brokerName = null;
+}
+{
+ (
+ "subscribe" <TO> nameComponents = QualifiedName()
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ (<COMMA> tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
+ {
+ stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+ }
+ | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+ {
+ setDataverses(new ArrayList<String>());
+ setDatasets(new ArrayList<String>());
+ VariableExpr varExp = new VariableExpr();
+ VarIdentifier var = new VarIdentifier();
+ varExp.setVar(var);
+ var.setValue("$subscriptionPlaceholder");
+ getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
+ List<String> dataverses = getDataverses();
+ List<String> datasets = getDatasets();
+ // we remove the pointer to the dataverses and datasets
+ setDataverses(null);
+ setDatasets(null);
+ stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+ }
+ | "change" "subscription" subscriptionId = StringLiteral() <ON> nameComponents = QualifiedName()
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ (<COMMA> tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ )*)? <RIGHTPAREN>
+ <TO> brokerName = QualifiedName()
+ {
+ stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+ }
+ )
+ {
+ return stmt;
+ }
+}
\ No newline at end of file
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
new file mode 100644
index 0000000..77e8afe
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class BADExecutionTest {
+
+ protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
+
+ protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
+ protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+ File.separator);
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+ protected static AsterixTransactionProperties txnProperties;
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static final boolean cleanupOnStart = true;
+ private static final boolean cleanupOnStop = true;
+
+ protected static TestGroup FailedGroup;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+ ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ExecutionTestUtil.tearDown(cleanupOnStop);
+ ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+ }
+
+ @Parameters(name = "BADExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return buildTestsInXml("testsuite.xml");
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public BADExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+ }
+}
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
new file mode 100644
index 0000000..ad2f1bf
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.apache.asterix.bad.lang.BADCompilationProvider;
+import org.apache.asterix.bad.lang.BADQueryTranslatorFactory;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.test.optimizer.OptimizerTest;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADOptimizerTest extends OptimizerTest {
+
+ private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ final File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+
+ extensionLangCompilationProvider = new BADCompilationProvider();
+ statementExecutorFactory = new BADQueryTranslatorFactory();
+
+ integrationUtil.init(true);
+ // Set the node resolver to be the identity resolver that expects node names
+ // to be node controller ids; a valid assumption in test environment.
+ System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+ IdentitiyResolverFactory.class.getName());
+ }
+
+ public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
+ super(queryFile, expectedFile, actualFile);
+ }
+
+}
diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
new file mode 100644
index 0000000..c2f5d41
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+ <metadataNode>asterix_nc1</metadataNode>
+ <store>
+ <ncId>asterix_nc1</ncId>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
+ </store>
+ <store>
+ <ncId>asterix_nc2</ncId>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
+ </store>
+ <transactionLogDir>
+ <ncId>asterix_nc1</ncId>
+ <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+ </transactionLogDir>
+ <transactionLogDir>
+ <ncId>asterix_nc2</ncId>
+ <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+ </transactionLogDir>
+ <extensions>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+ </extension>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+ </extension>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+ </extension>
+ </extensions>
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+ nodes are available)
+ before a submitted query/statement can be
+ executed. (Default = 60 seconds)
+ </description>
+ </property>
+ <property>
+ <name>log.level</name>
+ <value>WARNING</value>
+ <description>Log level for running tests/build</description>
+ </property>
+ <property>
+ <name>compiler.framesize</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>327680</value>
+ </property>
+ <property>
+ <name>compiler.groupmemory</name>
+ <value>163840</value>
+ </property>
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>163840</value>
+ </property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>32768</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "32768" // 32KB)
+ </description>
+ </property>
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>33554432</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "33554432" // 32MB)
+ </description>
+ </property>
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>8</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 8)
+ </description>
+ </property>
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+</asterixConfiguration>
diff --git a/asterix-bad/src/test/resources/conf/cluster.xml b/asterix-bad/src/test/resources/conf/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+ <instance_name>asterix</instance_name>
+ <store>storage</store>
+
+ <data_replication>
+ <enabled>false</enabled>
+ <replication_port>2016</replication_port>
+ <replication_factor>2</replication_factor>
+ <auto_failover>false</auto_failover>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+
+ <master_node>
+ <id>master</id>
+ <client_ip>127.0.0.1</client_ip>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2016</replication_port>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2017</replication_port>
+ </node>
+</cluster>
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/conf/hyracks-deployment.properties b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
new file mode 100644
index 0000000..17a6772
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
@@ -0,0 +1,21 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
+cc.ip=127.0.0.1
+cc.port=1098
diff --git a/asterix-bad/src/test/resources/conf/test.properties b/asterix-bad/src/test/resources/conf/test.properties
new file mode 100644
index 0000000..86269c8
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/test.properties
@@ -0,0 +1,22 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=nc1data
+nc2.stores=nc2data
+OutputDir=/tmp/asterix_output/
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
new file mode 100644
index 0000000..4dc9291
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
@@ -0,0 +1,36 @@
+/*
+ * Description : Check the Plan used by a channel
+ * Expected Res : Success
+ * Date : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+write output to nc1:"rttest/channel-create.adm";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
new file mode 100644
index 0000000..dcc98da
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
@@ -0,0 +1,40 @@
+/*
+ * Description : Check the Plan for Subscribing to a channel
+ * Expected Res : Success
+ * Date : Mar 2015
+ */
+
+drop dataverse channels2 if exists;
+create dataverse channels2;
+use dataverse channels2;
+
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-subscribe.adm";
+
+create broker brokerA at "http://www.hello.com";
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
new file mode 100644
index 0000000..ed182ee
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
@@ -0,0 +1,38 @@
+/*
+ * Description : Check the Plan for Unsubscribing to a channel
+ * Expected Res : Success
+ * Date : Mar 2015
+ */
+
+drop dataverse channels3 if exists;
+create dataverse channels3;
+use dataverse channels3;
+
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-unsubscribe.adm";
+
+unsubscribe "c45ef6d0-c5ae-4b9e-b5da-cf1932718296" from nearbyTweetChannel;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
new file mode 100644
index 0000000..889af1f
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -0,0 +1,57 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
new file mode 100644
index 0000000..c5871f9
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -0,0 +1,71 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$5] |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
new file mode 100644
index 0000000..bdb8734
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -0,0 +1,71 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+-- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- MATERIALIZE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..41b036a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date : March 2015
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..eb341e9
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel where $result.ChannelName = "nearbyTweetChannel"
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..7bace03
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description : Create Channel Test
+* Expected Res : Success
+* Date : March 2015
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..9a1e170
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel return $result;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date : March 2015
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..e762a27
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date : March 2015
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..bd73c12
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
@@ -0,0 +1,5 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel
+order by $result.ChannelName
+return $result;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
new file mode 100644
index 0000000..29b56e1
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
@@ -0,0 +1,56 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type userLocation as {
+ userId: int,
+ roomNumber: int
+}
+create type watchedUser as {
+ userId: int,
+ name: string
+}
+create type roomSecurity as {
+ roomNumber: int,
+ securityGuardName: string,
+ securityGuardNumber: string
+}
+
+create dataset watchedUsers(watchedUser)
+primary key userId;
+
+create dataset roomSecurityAssignments(roomSecurity)
+primary key roomNumber;
+
+upsert into dataset roomSecurityAssignments([
+{"roomNumber":123, "securityGuardName":"Mike", "securityGuardNumber":"555-4815"},
+{"roomNumber":222, "securityGuardName":"Steven", "securityGuardNumber":"555-1623"},
+{"roomNumber":350, "securityGuardName":"Vassilis", "securityGuardNumber":"555-1234"}]
+);
+
+upsert into dataset watchedUsers([
+{"userId":1, "name":"suspectNumber1"}]
+);
+
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants($room) {
+ for $location in dataset UserLocations
+ where $location.roomNumber = $room
+ return $location.userId
+};
+
+create broker brokerA at "http://www.notifyA.com";
+create broker brokerB at "http://www.notifyB.com";
+
+
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT5S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
new file mode 100644
index 0000000..8e15e19
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
@@ -0,0 +1,12 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+subscribe to roomRecords (123) on brokerA;
+subscribe to roomRecords (350) on brokerB;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
new file mode 100644
index 0000000..15d6a5e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
@@ -0,0 +1,15 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+
+from $test in dataset roomRecordsSubscriptions
+order by $test.BrokerName
+select {
+"broker":$test.BrokerName,
+"parameter":$test.param0
+}
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
new file mode 100644
index 0000000..15ebf7f
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
@@ -0,0 +1,16 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+
+upsert into dataset UserLocations([
+{"userId":1, "roomNumber":123},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":350}]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
new file mode 100644
index 0000000..891eeea
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
@@ -0,0 +1,7 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+5000
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
new file mode 100644
index 0000000..74f39a4
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
@@ -0,0 +1,14 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+
+upsert into dataset UserLocations([
+{"userId":1, "roomNumber":222},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":222}]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
new file mode 100644
index 0000000..f6295f0
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
@@ -0,0 +1,12 @@
+/*
+* Description : Room Occupants Test
+* Expected Res : Success
+* Date : Sep 2016
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+
+from $result in dataset roomRecordsResults
+order by $result.result
+select $result.result;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
new file mode 100644
index 0000000..d84ea65
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+* Description : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date : March 2015
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+create broker brokerA at "http://www.notifyA.com";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
new file mode 100644
index 0000000..8d7df53
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql
new file mode 100644
index 0000000..f3e19af
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("20.0, 20.0"), "Long") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql
new file mode 100644
index 0000000..b426495
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("10.0, 10.0"), "Prosper") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.aql
new file mode 100644
index 0000000..4937840
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.aql
@@ -0,0 +1,5 @@
+use dataverse channels;
+
+for $test in dataset nearbyTweetChannelSubscriptions
+order by $test.param1
+return $test.param1;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
new file mode 100644
index 0000000..dde1ee9
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelResults", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
new file mode 100644
index 0000000..e009733
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -0,0 +1 @@
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
new file mode 100644
index 0000000..4002a62
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
@@ -0,0 +1,4 @@
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 107, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "DEFAULT_NG_ALL_NODES", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{ }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 106, "PendingOp": 0 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
new file mode 100644
index 0000000..1da5787
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
new file mode 100644
index 0000000..8f3c264
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
@@ -0,0 +1,2 @@
+{ "broker": "brokerA", "parameter": 123 }
+{ "broker": "brokerB", "parameter": 350 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
new file mode 100644
index 0000000..c396a2c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
@@ -0,0 +1,2 @@
+1
+3
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
new file mode 100644
index 0000000..d9268fb
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
@@ -0,0 +1,3 @@
+"Live"
+"Long"
+"Prosper"
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
new file mode 100644
index 0000000..997dc77
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -0,0 +1,37 @@
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
+ ResultOffsetPath="results"
+ QueryOffsetPath="queries"
+ QueryFileExtension=".aql">
+ <test-group name="channel">
+ <test-case FilePath="channel">
+ <compilation-unit name="room_occupants">
+ <output-dir compare="Text">room_occupants</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="create_channel_check_datasets">
+ <output-dir compare="Text">create_channel_check_datasets</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="create_channel_check_metadata">
+ <output-dir compare="Text">create_channel_check_metadata</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="drop_channel_check_datasets">
+ <output-dir compare="Text">drop_channel_check_datasets</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="drop_channel_check_metadata">
+ <output-dir compare="Text">drop_channel_check_metadata</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="subscribe_channel_check_subscriptions">
+ <output-dir compare="Text">subscribe_channel_check_subscriptions</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>