Updated to match code changes to asterix
Added Procedure Langauge and Metadata
Restructured to fit with bom pom
Added ChannelJobService for execution tasks
Added string constants file
Added BAD Rewrite Rule Set
Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
diff --git a/.gitignore b/.gitignore
index a4ee8e4..42fc47d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
git.properties
.DS_Store
*.swp
+build
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
new file mode 100644
index 0000000..1a26d5a
--- /dev/null
+++ b/asterix-bad/pom.xml
@@ -0,0 +1,218 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.asterix.bad</groupId>
+ <artifactId>asterix-opt</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-bad</artifactId>
+ <properties>
+ <asterix.version>0.8.9-SNAPSHOT</asterix.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+ <version>${asterix.version}</version>
+ <configuration>
+ <base>${project.basedir}</base>
+ <gbase>../../asterix-lang-aql/src/main/javacc/AQL.jj</gbase>
+ <gextension>src/main/resources/lang-extension/lang.txt</gextension>
+ <output>target/generated-resources/javacc/grammar.jj</output>
+ <parserClassName>BADAQLParser</parserClassName>
+ <packageName>org.apache.asterix.bad.lang</packageName>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>grammarix</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>javacc</id>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ <configuration>
+ <isStatic>false</isStatic>
+ <javaUnicodeEscape>true</javaUnicodeEscape>
+ <sourceDirectory>target/generated-resources/javacc</sourceDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>javacc-jjdoc</id>
+ <goals>
+ <goal>jjdoc</goal>
+ </goals>
+ <phase>process-sources</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/javacc/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+ <versionRange>[${asterix.version},)</versionRange>
+ <goals>
+ <goal>grammarix</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <versionRange>[2.6,)</versionRange>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>${asterix.version}</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>${asterix.version}</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>${asterix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-test-framework</artifactId>
+ <version>${asterix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-active</artifactId>
+ <version>${asterix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-algebra</artifactId>
+ <version>${asterix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-app</artifactId>
+ <version>${asterix.version}</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-app</artifactId>
+ <version>${asterix.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>${asterix.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
new file mode 100644
index 0000000..a906ae6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+public interface BADConstants {
+ public static final String SubscriptionId = "subscriptionId";
+ public static final String BrokerName = "BrokerName";
+ public static final String ChannelName = "ChannelName";
+ public static final String ProcedureName = "ProcedureName";
+ public static final String DataverseName = "DataverseName";
+ public static final String BrokerEndPoint = "BrokerEndPoint";
+ public static final String DeliveryTime = "deliveryTime";
+ public static final String ResultId = "resultId";
+ public static final String ChannelExecutionTime = "channelExecutionTime";
+ public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+ public static final String ChannelResultsType = "ChannelResultsType";
+ public static final String ResultsDatasetName = "ResultsDatasetName";
+ public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+ public static final String CHANNEL_EXTENSION_NAME = "Channel";
+ public static final String PROCEDURE_KEYWORD = "Procedure";
+ public static final String BROKER_KEYWORD = "Broker";
+ public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
+ public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
+ public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
+ public static final String subscriptionEnding = "Subscriptions";
+ public static final String resultsEnding = "Results";
+ public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
+ public static final String BAD_DATAVERSE_NAME = "Metadata";
+ public static final String Duration = "Duration";
+ public static final String Function = "Function";
+ public static final String FIELD_NAME_ARITY = "Arity";
+ public static final String FIELD_NAME_PARAMS = "Params";
+ public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
+ public static final String FIELD_NAME_DEFINITION = "Definition";
+ public static final String FIELD_NAME_LANGUAGE = "Language";
+
+ public enum ChannelJobType {
+ REPETITIVE
+ }
+}
diff --git a/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
new file mode 100644
index 0000000..d1df438
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for running channel jobs and communicating with Brokers
+ */
+public class ChannelJobService {
+
+ private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
+
+ public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+ IHyracksClientConnection hcc, long duration)
+ throws Exception {
+ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ executeJob(jobSpec, jobFlags, jobId, hcc);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
+ }
+ }
+ }, duration, duration, TimeUnit.MILLISECONDS);
+ return scheduledExecutorService;
+ }
+
+ public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+ IHyracksClientConnection hcc)
+ throws Exception {
+ LOGGER.info("Executing Channel Job");
+ if (jobId == null) {
+ hcc.startJob(jobSpec, jobFlags);
+ } else {
+ hcc.startJob(jobSpec, jobFlags, jobId);
+ }
+ }
+
+ public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+ JobId jobId = hcc.startJob(channeljobSpec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
+ AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
+ String formattedString;
+ formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+ sendMessage(brokerEndpoint, formattedString);
+ }
+
+ public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
+ String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
+ + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ + channelExecutionTime + "\", \"subscriptionIds\":[";
+ for (int i = 0; i < subscriptionIds.size(); i++) {
+ AUUID subId = (AUUID) subscriptionIds.getItem(i);
+ String subString = subId.toSimpleString();
+ JSON += "\"" + subString + "\"";
+ if (i < subscriptionIds.size() - 1) {
+ JSON += ",";
+ }
+ }
+ JSON += "]}";
+ return JSON;
+
+ }
+
+ public static long findPeriod(String duration) {
+ //TODO: Allow Repetitive Channels to use YMD durations
+ String hoursMinutesSeconds = "";
+ if (duration.indexOf('T') != -1) {
+ hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+ }
+ double seconds = 0;
+ if (hoursMinutesSeconds != "") {
+ int pos = 0;
+ if (hoursMinutesSeconds.indexOf('H') != -1) {
+ Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+ seconds += (hours * 60 * 60);
+ pos = hoursMinutesSeconds.indexOf('H') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('M') != -1) {
+ Double minutes =
+ Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+ seconds += (minutes * 60);
+ pos = hoursMinutesSeconds.indexOf('M') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('S') != -1) {
+ Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+ seconds += (s);
+ }
+ }
+ return (long) (seconds * 1000);
+ }
+
+ public static void sendMessage(String targetURL, String urlParameters) {
+ HttpURLConnection connection = null;
+ try {
+ //Create connection
+ URL url = new URL(targetURL);
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+ connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+ connection.setRequestProperty("Content-Language", "en-US");
+
+ connection.setUseCaches(false);
+ connection.setDoOutput(true);
+
+ if (connection.getOutputStream() != null) {
+ //Send message
+ DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+ wr.writeBytes(urlParameters);
+ wr.close();
+ } else {
+ LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ int responseCode = connection.getResponseCode();
+ LOGGER.info("\nSending 'POST' request to URL : " + url);
+ LOGGER.info("Post parameters : " + urlParameters);
+ LOGGER.info("Response Code : " + responseCode);
+ }
+
+ if (connection.getInputStream() != null) {
+ BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, response.toString());
+ }
+ } else {
+ LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
+ }
+
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ChannelJobService";
+ }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
similarity index 91%
rename from src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 42036af..0a6ced2 100644
--- a/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -20,6 +20,7 @@
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory;
import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory;
import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
@@ -49,4 +50,9 @@
return new AqlExpressionToPlanTranslatorFactory();
}
+ @Override
+ public IRuleSetFactory getRuleSetFactory() {
+ return new BADRuleSetFactory();
+ }
+
}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
similarity index 69%
rename from src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
index 9832fe6..959600f 100644
--- a/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -25,19 +25,17 @@
import org.apache.asterix.bad.metadata.BrokerSearchKey;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.ChannelSearchKey;
+import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
+import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.bad.metadata.ProcedureSearchKey;
import org.apache.asterix.common.api.ExtensionId;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
public class BADLangExtension implements ILangExtension {
@@ -69,13 +67,6 @@
return ExtensionKind.LANG;
}
- @Override
- public boolean unnestToDataScan(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
- UnnestOperator unnestOp, ILogicalExpression unnestExpr, AbstractFunctionCallExpression functionCallExpr)
- throws AlgebricksException {
- // TODO I dont need this?????
- return false;
- }
public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName)
throws AlgebricksException {
@@ -103,4 +94,29 @@
}
}
+ public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String procedureName, String arity) throws AlgebricksException {
+ ProcedureSearchKey procedureSearchKey = new ProcedureSearchKey(dataverseName, procedureName, arity);
+ List<Procedure> procedures = MetadataManager.INSTANCE.getEntities(mdTxnCtx, procedureSearchKey);
+ if (procedures.isEmpty()) {
+ return null;
+ } else if (procedures.size() > 1) {
+ throw new AlgebricksException("Procedure search key returned more than one channel");
+ } else {
+ return procedures.get(0);
+ }
+ }
+
+ public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ throws AlgebricksException {
+ DataverseBrokersSearchKey brokerSearchKey = new DataverseBrokersSearchKey(dataverseName);
+ return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+ }
+
+ public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName)
+ throws AlgebricksException {
+ DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName);
+ return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+ }
+
}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
similarity index 87%
rename from src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
index 4198230..20519dd 100644
--- a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
@@ -20,10 +20,8 @@
import java.util.List;
-import org.apache.asterix.app.cc.CompilerExtensionManager;
import org.apache.asterix.app.cc.IStatementExecutorExtension;
import org.apache.asterix.common.api.ExtensionId;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -33,8 +31,8 @@
BADQueryTranslatorExtension.class.getSimpleName(), 0);
private static class LazyHolder {
- private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory(
- (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager());
+ private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory();
+
}
@Override
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
similarity index 83%
rename from src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index b8a6050..958b14f 100644
--- a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -20,22 +20,17 @@
import java.util.List;
-import org.apache.asterix.app.cc.CompilerExtensionManager;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.app.SessionConfig;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.SessionConfig;
public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
- public BADQueryTranslatorFactory(CompilerExtensionManager ccExtensionManager) {
- super(ccExtensionManager);
- }
-
@Override
public QueryTranslator create(List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider) {
- return new BADStatementExecutor(statements, conf, compilationProvider, cExtensionManager);
+ return new BADStatementExecutor(statements, conf, compilationProvider);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
new file mode 100644
index 0000000..31d8cd0
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
+import org.apache.asterix.optimizer.base.RuleCollections;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class BADRuleSetFactory implements IRuleSetFactory {
+
+ @Override
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
+ throws AlgebricksException {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+ if (logicalRuleSet.size() != 14) {
+ throw new AlgebricksException("Incorrect RuleSet");
+ }
+ List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
+
+ for (int i = 0; i < normalizationCollection.size(); i++) {
+ IAlgebraicRewriteRule rule = normalizationCollection.get(i);
+ if (rule instanceof UnnestToDataScanRule) {
+ normalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
+ break;
+ }
+ }
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ logicalRuleSet.set(3, new Pair<>(seqOnceCtrl, normalizationCollection));
+ logicalRuleSet.set(7, new Pair<>(seqOnceCtrl, normalizationCollection));
+ return logicalRuleSet;
+ }
+
+ @Override
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
+ return DefaultRuleSetFactory.buildPhysical();
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
new file mode 100644
index 0000000..fa18867
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDropStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+public class BADStatementExecutor extends QueryTranslator {
+
+ public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
+ ILangCompilationProvider compliationProvider) {
+ super(aqlStatements, conf, compliationProvider);
+ }
+
+
+ @Override
+ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ //TODO: Remove this when metadata dependencies are in place
+ //TODO: Stop dataset drop when dataset used by channel
+ super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
+ List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+ for (Broker broker : brokers) {
+ BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
+ drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ }
+ List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
+ for (Channel channel : channels) {
+ ChannelDropStatement drop = new ChannelDropStatement(dvId,
+ new Identifier(channel.getChannelId().getEntityName()), false);
+ drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
similarity index 95%
rename from src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 53b5ff7..7894c44 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -27,7 +27,7 @@
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -76,7 +76,7 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
similarity index 97%
rename from src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 0faefa3..6811ef2 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -43,7 +43,7 @@
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -94,7 +94,7 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
similarity index 84%
rename from src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7d0cb1a..dc10742 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -29,20 +29,26 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
import org.apache.asterix.lang.common.expression.FieldBinding;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.UpsertStatement;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -119,12 +125,13 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
- String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
+ String brokerDataverse = ((QueryTranslator) statementExecutor)
+.getActiveDataverse(brokerDataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -182,18 +189,32 @@
subscriptionTuple.setVarCounter(varCounter);
if (subscriptionId == null) {
- List<String> returnField = new ArrayList<>();
- returnField.add(BADConstants.SubscriptionId);
+
+ VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+ VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+ VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
+ VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
+ useResultVar.setIsNewVar(false);
+ useSubscriptionVar.setIsNewVar(false);
+ Query returnQuery = new Query(false);
+ List<Clause> clauseList = new ArrayList<>();
+ LetClause let = new LetClause(subscriptionVar,
+ new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId)));
+ clauseList.add(let);
+ FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
+ returnQuery.setBody(body);
+
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
InsertStatement insert = new InsertStatement(new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, false, returnField);
+ new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar,
+ returnQuery);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
resultDelivery, stats, false);
} else {
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
- new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter);
+ new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
resultDelivery, stats, false);
}
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
similarity index 97%
rename from src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 50696b4..17a54ec 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -41,7 +41,7 @@
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -117,7 +117,7 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
similarity index 95%
rename from src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index a4d0eaf..02389f1 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -30,7 +30,7 @@
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -80,7 +80,7 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
similarity index 84%
rename from src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 824e725..77de93e 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -23,6 +23,7 @@
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +38,7 @@
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.ChannelJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.ChannelEventsListener;
@@ -63,11 +65,11 @@
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -83,6 +85,8 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
@@ -98,14 +102,16 @@
private InsertStatement channelResultsInsertQuery;
private String subscriptionsTableName;
private String resultsTableName;
+ private boolean distributed;
public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
- Expression period) {
+ Expression period, boolean distributed) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.function = function;
this.period = (CallExpr) period;
this.duration = "";
+ this.distributed = distributed;
}
public Identifier getDataverseName() {
@@ -177,7 +183,7 @@
}
public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
- String channelName, String duration, AqlMetadataProvider metadataProvider, JobSpecification channeljobSpec,
+ String channelName, String duration, MetadataProvider metadataProvider, JobSpecification channeljobSpec,
String strIP, int port) throws Exception {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
IOperatorDescriptor channelQueryExecuter;
@@ -199,7 +205,7 @@
RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
channelName, duration, channeljobSpec, strIP, port);
- String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+ String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
Set<String> ncs = new HashSet<>(Arrays.asList(partition));
AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
ncs.toArray(new String[ncs.size()]));
@@ -207,7 +213,7 @@
}
private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
- Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
@@ -242,11 +248,11 @@
}
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
- Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
- builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
+ builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
builder.append(
@@ -266,7 +272,7 @@
builder.append("\"result\":$result");
builder.append("}");
builder.append(")");
- builder.append(" return records");
+ builder.append(" returning $a");
builder.append(";");
AQLParserFactory aqlFact = new AQLParserFactory();
List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
@@ -274,18 +280,42 @@
hcc, hdc, ResultDelivery.ASYNC, stats, true);
}
+ private void setupCompiledJob(MetadataProvider metadataProvider, String dataverse, EntityId entityId,
+ JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+ ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
+ ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
+ String strIP = ccInfo.getClientNetAddress();
+ int port = ccInfo.getClientNetPort();
+ //Create Channel Operator
+ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(dataverse,
+ channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
+
+ ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, alteredJobSpec.first);
+ alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+ JobUtils.runJob(hcc, alteredJobSpec.first, false);
+ }
+
+ private void setupDistributedJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc)
+ throws Exception {
+ ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, channeljobSpec);
+ channeljobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+ JobId jobId = hcc.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB));
+ ChannelJobService.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB), jobId, hcc,
+ ChannelJobService.findPeriod(duration));
+ }
+
@Override
- public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
- //2. Create the compiled Channel Job
+ //2. Create and run the Channel Job
//3. Create the metadata entry for the channel
//TODO: Figure out how to handle when a subset of the 3 tasks fails
- //TODO: The compiled job will break if anything changes to the function or two datasets
+ //TODO: The compiled job will break if anything changes on the function or two datasets
// Need to make sure we do proper checking when altering these things
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
@@ -341,19 +371,11 @@
JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
metadataProvider, hcc, hdc, stats, dataverse);
- //Create Channel Operator
- ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
- ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
- String strIP = ccInfo.getClientNetAddress();
- int port = ccInfo.getClientNetPort();
- Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(
- dataverse, channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
-
- ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE,
- alteredJobSpec.first);
- alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
- JobUtils.runJob(hcc, alteredJobSpec.first, false);
-
+ if (distributed) {
+ setupDistributedJob(entityId, channeljobSpec, hcc);
+ } else {
+ setupCompiledJob(metadataProvider, dataverse, entityId, channeljobSpec, hcc);
+ }
eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
new file mode 100644
index 0000000..3edc7dc
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class CreateProcedureStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateProcedureStatement.class.getName());
+
+ private final FunctionSignature signature;
+ private final String functionBody;
+ private final List<String> paramList;
+
+ public FunctionSignature getaAterixFunction() {
+ return signature;
+ }
+
+ public String getFunctionBody() {
+ return functionBody;
+ }
+
+ public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
+ String functionBody) {
+ this.signature = signature;
+ this.functionBody = functionBody;
+ this.paramList = new ArrayList<String>();
+ for (VarIdentifier varId : parameterList) {
+ this.paramList.add(varId.getValue());
+ }
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ public List<String> getParamList() {
+ return paramList;
+ }
+
+ public FunctionSignature getSignature() {
+ return signature;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ private JobSpecification createProcedureJob(String body, IStatementExecutor statementExecutor,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
+ throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append(body);
+ builder.append(";");
+ AQLParserFactory aqlFact = new AQLParserFactory();
+ List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+ if (fStatements.size() > 1) {
+ throw new Exception("Procedure can only execute a single statement");
+ }
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+ hcc, hdc, ResultDelivery.ASYNC, stats, true);
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+ String dataverse =
+ ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
+
+ EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+ ChannelEventsListener listener =
+ (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Procedure procedure = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+ Integer.toString(signature.getArity()));
+ if (procedure != null) {
+ throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
+ }
+ if (listener != null) {
+ subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+ }
+ if (subscriberRegistered) {
+ throw new AsterixException("Procedure " + signature.getName() + " is already running");
+ }
+
+ procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
+ Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL);
+
+ // Now we subscribe
+ if (listener == null) {
+ listener = new ChannelEventsListener(entityId);
+ ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ }
+ listener.registerEventSubscriber(eventSubscriber);
+ subscriberRegistered = true;
+
+
+ //Create Procedure Internal Job
+ JobSpecification channeljobSpec =
+ createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
+
+ // setupDistributedJob(entityId, channeljobSpec, hcc);
+
+ eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
similarity index 91%
rename from src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index 05ab4c6..7222b1a 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -57,6 +57,9 @@
public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
+ public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.RECORD_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
+
@Override
public ExtensionId getId() {
return BAD_METADATA_EXTENSION_ID;
@@ -88,12 +91,14 @@
// enlist datasets
MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.PROCEDURE_DATASET);
if (MetadataBootstrap.isNewUniverse()) {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
// add metadata datasets
MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
- new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET });
+ new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET,
+ BADMetadataIndexes.PROCEDURE_DATASET });
// insert default dataverse
// TODO prevent user from dropping this dataverse
// MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
@@ -102,6 +107,7 @@
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
// TODO prevent user from dropping these types
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
similarity index 74%
rename from src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
index 848fe78..b33dcad 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -44,8 +44,16 @@
MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
+ public static final ExtensionMetadataDatasetId BAD_PROCEDURE_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.PROCEDURE_KEYWORD);
+ public static final MetadataIndexImmutableProperties PROPERTIES_PROCEDURE =
+ new MetadataIndexImmutableProperties(BADConstants.PROCEDURE_KEYWORD,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2);
+
public static final int NUM_FIELDS_CHANNEL_IDX = 3;
public static final int NUM_FIELDS_BROKER_IDX = 3;
+ public static final int NUM_FIELDS_PROCEDURE_IDX = 4;
@SuppressWarnings({ "rawtypes", "unchecked" })
public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
@@ -63,4 +71,12 @@
0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
new BrokerTupleTranslator(true));
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset PROCEDURE_DATASET = new ExtensionMetadataDataset(PROPERTIES_PROCEDURE,
+ NUM_FIELDS_PROCEDURE_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADConstants.ProcedureName), Arrays.asList(BADConstants.FIELD_NAME_ARITY)),
+ 0, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, true, new int[] { 0, 1, 2 }, BAD_PROCEDURE_INDEX_ID,
+ new ProcedureTupleTranslator(true));
+
}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
similarity index 73%
rename from src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index cec98d0..6ee5735 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -20,6 +20,7 @@
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
@@ -74,4 +75,26 @@
//IsOpen?
true);
+ //----------------------------------------- Procedure ----------------------------------------//
+ public static final int PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX = 1;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX = 2;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX = 3;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6;
+ public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ BADConstants.RECORD_TYPENAME_PROCEDURE,
+ // FieldNames
+ new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
+ BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE,
+ BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+
}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/metadata/Broker.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
similarity index 96%
rename from src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index 0a37c02..34397f4 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -20,7 +20,7 @@
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
@@ -42,7 +42,8 @@
public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ private ISerializerDeserializer<ARecord> recordSerDes =
+ SerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
@SuppressWarnings("unchecked")
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/metadata/Channel.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
similarity index 93%
rename from src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
index 82c97c8..a3c757b 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
@@ -69,9 +69,12 @@
case JOB_FINISH:
handleJobFinishEvent(event);
break;
- default:
- LOGGER.warn("Unknown Channel Event" + event);
+ case PARTITION_EVENT:
+ LOGGER.warn("Partition Channel Event");
break;
+ default:
+ break;
+
}
} catch (Exception e) {
LOGGER.error("Unhandled Exception", e);
@@ -141,21 +144,16 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification spec) {
- EntityId channelId = null;
try {
- for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
- if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
- channelId = ((RepetitiveChannelOperatorDescriptor) opDesc).getEntityId();
- registerJob(channelId, jobId, spec);
- return;
- }
- }
+ registerJob(jobId, spec);
+ return;
+
} catch (Exception e) {
LOGGER.error(e);
}
}
- public synchronized void registerJob(EntityId entityId, JobId jobId, JobSpecification jobSpec) {
+ public synchronized void registerJob(JobId jobId, JobSpecification jobSpec) {
if (jobs.get(jobId.getId()) != null) {
throw new IllegalStateException("Channel job already registered");
}
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
similarity index 100%
rename from src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
similarity index 97%
rename from src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 18b2067..b9ae250 100644
--- a/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
@@ -43,7 +43,7 @@
public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
@SuppressWarnings("unchecked")
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
new file mode 100644
index 0000000..527d65b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseBrokersSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
new file mode 100644
index 0000000..ffb3ab6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseChannelsSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
new file mode 100644
index 0000000..b64bf1b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+public class Procedure implements IExtensionMetadataEntity {
+ private static final long serialVersionUID = 1L;
+ public static final String LANGUAGE_AQL = "AQL";
+ public static final String LANGUAGE_JAVA = "JAVA";
+
+ public static final String RETURNTYPE_VOID = "VOID";
+ public static final String NOT_APPLICABLE = "N/A";
+
+ private final EntityId procedureId;
+ private final int arity;
+ private final List<String> params;
+ private final String body;
+ private final String returnType;
+ private final String language;
+
+ public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType,
+ String functionBody, String language) {
+ this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
+ this.params = params;
+ this.body = functionBody;
+ this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
+ this.language = language;
+ this.arity = arity;
+ }
+
+ public EntityId getEntityId() {
+ return procedureId;
+ }
+
+ public List<String> getParams() {
+ return params;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public String getReturnType() {
+ return returnType;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public int getArity() {
+ return arity;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Procedure)) {
+ return false;
+ }
+ Procedure otherDataset = (Procedure) other;
+ if (!otherDataset.procedureId.equals(procedureId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
new file mode 100644
index 0000000..6456170
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ProcedureSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String channel;
+ private final String arity;
+
+ public ProcedureSearchKey(String dataverse, String channel, String arity) {
+ this.dataverse = dataverse;
+ this.channel = channel;
+ this.arity = arity;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, channel, arity);
+ }
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
new file mode 100644
index 0000000..f2eab9b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Procedure metadata entity to an ITupleReference and vice versa.
+ */
+public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> {
+ // Field indexes of serialized Procedure in a tuple.
+ // First key field.
+ public static final int PROCEDURE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+ // Second key field.
+ public static final int PROCEDURE_PROCEDURE_NAME_TUPLE_FIELD_INDEX = 1;
+ // Third key field.
+ public static final int PROCEDURE_ARITY_TUPLE_FIELD_INDEX = 2;
+
+ // Payload field containing serialized Procedure.
+ public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ protected ProcedureTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX);
+ }
+
+ @Override
+ public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord procedureRecord = recordSerDes.deserialize(in);
+ return createProcedureFromARecord(procedureRecord);
+ }
+
+ private Procedure createProcedureFromARecord(ARecord procedureRecord) {
+ String dataverseName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+ .getStringValue();
+ String procedureName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX))
+ .getStringValue();
+ String arity = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX)).getStringValue();
+
+ IACursor cursor = ((AOrderedList) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX)).getCursor();
+ List<String> params = new ArrayList<String>();
+ while (cursor.next()) {
+ params.add(((AString) cursor.get()).getStringValue());
+ }
+
+ String returnType = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX))
+ .getStringValue();
+
+ String definition = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX))
+ .getStringValue();
+
+ String language = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
+ .getStringValue();
+
+ return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
+ language);
+
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Procedure procedure) throws IOException, MetadataException {
+ // write the key in the first 2 fields of the tuple
+ tupleBuilder.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ // write the pay-load in the fourth field of the tuple
+
+ recordBuilder.reset(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ OrderedListBuilder listBuilder = new OrderedListBuilder();
+ ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+ listBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE
+ .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX]);
+ for (String param : procedure.getParams()) {
+ itemValue.reset();
+ aString.setValue(param);
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ fieldValue.reset();
+ aString.setValue(procedure.getReturnType());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue);
+
+ // write field 5
+ fieldValue.reset();
+ aString.setValue(procedure.getBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX, fieldValue);
+
+ // write field 6
+ fieldValue.reset();
+ aString.setValue(procedure.getLanguage());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+
+}
diff --git a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
similarity index 95%
rename from src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 8e19fc0..89f0d20 100644
--- a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -49,9 +49,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -75,10 +75,10 @@
return false;
}
AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+ if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
return false;
}
- ExtensionOperator eOp = (ExtensionOperator) op;
+ DelegateOperator eOp = (DelegateOperator) op;
if (!(eOp.getDelegate() instanceof CommitOperator)) {
return false;
}
@@ -118,10 +118,6 @@
AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
//now brokerNameVar holds the brokerName for use farther up in the plan
- //Place assignOp between the scan and the op above it
- assignOp.getInputs().addAll(opAboveBrokersScan.getInputs());
- opAboveBrokersScan.getInputs().clear();
- opAboveBrokersScan.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
context.computeAndSetTypeEnvironmentForOperator(assignOp);
context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
context.computeAndSetTypeEnvironmentForOperator(eOp);
@@ -140,7 +136,7 @@
context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
- ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
+ DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
@@ -148,7 +144,7 @@
return true;
}
- private ExtensionOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+ private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
throws AlgebricksException {
@@ -196,7 +192,7 @@
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
- ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
+ DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
extensionOp.setPhysicalOperator(notifyBrokerPOp);
extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
@@ -221,10 +217,13 @@
Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
DataSourceScanOperator brokerScan = null;
+ int index = 0;
for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
brokerScan = (DataSourceScanOperator) subOp.getValue();
+ break;
}
+ index++;
}
Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
new VariableReferenceExpression(brokerScan.getVariables().get(2)));
@@ -235,7 +234,14 @@
varArray.add(brokerEndpointVar);
ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
- return new AssignOperator(varArray, exprArray);
+
+ AssignOperator assignOp = new AssignOperator(varArray, exprArray);
+
+ //Place assignOp between the scan and the op above it
+ assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
+ opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
+
+ return assignOp;
}
/*This function searches for the needed op
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
similarity index 93%
rename from src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index c680988..d281b49 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -22,14 +22,14 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
/**
* A repetitive channel operator, which uses a Java timer to run a given query periodically
*/
-public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
private final LogicalVariable subscriptionIdVar;
private final LogicalVariable brokerEndpointVar;
private final LogicalVariable channelExecutionVar;
@@ -64,7 +64,7 @@
}
@Override
- public IOperatorExtension newInstance() {
+ public IOperatorDelegate newInstance() {
return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
similarity index 97%
rename from src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 753ece7..12d5ae2 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,7 +27,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -48,7 +48,7 @@
@Override
public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.EXTENSION_OPERATOR;
+ return PhysicalOperatorTag.DELEGATE_OPERATOR;
}
@Override
@@ -73,7 +73,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- ExtensionOperator notify = (ExtensionOperator) op;
+ DelegateOperator notify = (DelegateOperator) op;
LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
similarity index 90%
rename from src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index d55080c..8634e4c 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -20,6 +20,7 @@
package org.apache.asterix.bad.runtime;
import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.asterix.active.ActiveManager;
@@ -34,7 +35,6 @@
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -62,11 +62,10 @@
private IScalarEvaluator eval2;
private final ActiveManager activeManager;
private final EntityId entityId;
- private ChannelJobService channelJobService;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
- EntityId activeJobId) throws AlgebricksException {
+ EntityId activeJobId) throws HyracksDataException {
this.tRef = new FrameTupleReference();
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = subEvalFactory.createScalarEvaluator(ctx);
@@ -74,7 +73,6 @@
this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getActiveManager();
this.entityId = activeJobId;
- channelJobService = new ChannelJobService();
}
@Override
@@ -88,13 +86,11 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
- try {
- eval0.evaluate(tRef, inputArg0);
- eval1.evaluate(tRef, inputArg1);
- eval2.evaluate(tRef, inputArg2);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
+
+ eval0.evaluate(tRef, inputArg0);
+ eval1.evaluate(tRef, inputArg1);
+ eval2.evaluate(tRef, inputArg2);
+
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
@@ -106,9 +102,14 @@
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
- String executionTimeString = executionTime.toSimpleString();
+ String executionTimeString;
+ try {
+ executionTimeString = executionTime.toSimpleString();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
- channelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+ ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
executionTimeString);
}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
similarity index 95%
rename from src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
index d5452d4..d5d05cf 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -20,11 +20,11 @@
package org.apache.asterix.bad.runtime;
import org.apache.asterix.active.EntityId;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
@@ -49,7 +49,7 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
}
}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
similarity index 90%
rename from src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
index f3b0a90..8093977 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -19,7 +19,6 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -61,11 +60,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
- try {
- return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
}
public String getDuration() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..1bbe331
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private final JobSpecification jobSpec;
+ private long duration;
+ private final HyracksConnection hcc;
+
+ public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+ JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException {
+ super(ctx, runtimeId);
+ this.jobSpec = channeljobSpec;
+ this.duration = ChannelJobService.findPeriod(duration);
+ try {
+ hcc = new HyracksConnection(strIP, port);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+
+ @Override
+ protected void start() throws HyracksDataException, InterruptedException {
+ try {
+ scheduledExecutorService =
+ ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ while (!scheduledExecutorService.isTerminated()) {
+
+ }
+
+ }
+
+ @Override
+ protected void abort() throws HyracksDataException, InterruptedException {
+ scheduledExecutorService.shutdown();
+ }
+
+}
diff --git a/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
similarity index 77%
rename from src/main/resources/lang-extension/lang.txt
rename to asterix-bad/src/main/resources/lang-extension/lang.txt
index 233ec97..94b4c78 100644
--- a/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -4,6 +4,7 @@
import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
@merge
@@ -34,7 +35,7 @@
(
// merge area 2
before:
- after: | stmt = ChannelSpecification() | stmt = BrokerSpecification())
+ after: | stmt = ChannelSpecification() | stmt = BrokerSpecification() | stmt = ProcedureSpecification())
{
// merge area 3
}
@@ -55,8 +56,8 @@
{
stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
}
- | <BROKER> pairId = QualifiedName() ifExists = IfExists()
- {
+ | "broker" pairId = QualifiedName() ifExists = IfExists()
+ {
stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
}
)
@@ -73,15 +74,16 @@
CreateChannelStatement ccs = null;
String fqFunctionName = null;
Expression period = null;
+ boolean distributed = false;
}
{
(
"repetitive" "channel" nameComponents = QualifiedName()
<USING> appliedFunction = FunctionSignature()
- "period" period = FunctionCallExpr()
+ "period" period = FunctionCallExpr() ("distributed" { distributed = true; })?
{
ccs = new CreateChannelStatement(nameComponents.first,
- nameComponents.second, appliedFunction, period);
+ nameComponents.second, appliedFunction, period, distributed);
}
)
{
@@ -89,6 +91,38 @@
}
}
+
+@new
+CreateProcedureStatement ProcedureSpecification() throws ParseException:
+{
+ Pair<Identifier,Identifier> nameComponents = null;
+ FunctionSignature signature;
+ List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+ String functionBody;
+ Token beginPos;
+ Token endPos;
+ Expression functionBodyExpr;
+}
+{
+ "procedure" nameComponents = QualifiedName()
+ paramList = ParameterList()
+ <LEFTBRACE>
+ {
+ beginPos = token;
+ }
+ functionBodyExpr = Expression() <RIGHTBRACE>
+ {
+ endPos = token;
+ functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+ signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size());
+ removeCurrentScope();
+ return new CreateProcedureStatement(signature, paramList, functionBody);
+ }
+}
+
+
+
+
@new
CreateBrokerStatement BrokerSpecification() throws ParseException:
{
@@ -98,7 +132,7 @@
}
{
(
- <BROKER> name = QualifiedName()
+ "broker" name = QualifiedName()
<AT> endPoint = StringLiteral()
{
cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
@@ -169,10 +203,4 @@
{
return stmt;
}
-}
-
-<DEFAULT,IN_DBL_BRACE>
-TOKEN [IGNORE_CASE]:
-{
- <BROKER : "broker">
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
similarity index 100%
rename from src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
rename to asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
diff --git a/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
similarity index 89%
rename from src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
rename to asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
index 4949b34..ad2f1bf 100644
--- a/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -21,6 +21,8 @@
import java.io.File;
import java.util.logging.Logger;
+import org.apache.asterix.bad.lang.BADCompilationProvider;
+import org.apache.asterix.bad.lang.BADQueryTranslatorFactory;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.IdentitiyResolverFactory;
@@ -41,6 +43,9 @@
final File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
+ extensionLangCompilationProvider = new BADCompilationProvider();
+ statementExecutorFactory = new BADQueryTranslatorFactory();
+
integrationUtil.init(true);
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
diff --git a/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
similarity index 100%
rename from src/test/resources/conf/asterix-build-configuration.xml
rename to asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
diff --git a/src/test/resources/conf/cluster.xml b/asterix-bad/src/test/resources/conf/cluster.xml
similarity index 100%
rename from src/test/resources/conf/cluster.xml
rename to asterix-bad/src/test/resources/conf/cluster.xml
diff --git a/src/test/resources/conf/hyracks-deployment.properties b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
similarity index 100%
rename from src/test/resources/conf/hyracks-deployment.properties
rename to asterix-bad/src/test/resources/conf/hyracks-deployment.properties
diff --git a/src/test/resources/conf/test.properties b/asterix-bad/src/test/resources/conf/test.properties
similarity index 100%
rename from src/test/resources/conf/test.properties
rename to asterix-bad/src/test/resources/conf/test.properties
diff --git a/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
similarity index 100%
rename from src/test/resources/optimizerts/queries/channel/channel-create.aql
rename to asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
diff --git a/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
similarity index 91%
rename from src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
rename to asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
index 682bd6d..dcc98da 100644
--- a/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
@@ -4,9 +4,9 @@
* Date : Mar 2015
*/
-drop dataverse channels if exists;
-create dataverse channels;
-use dataverse channels;
+drop dataverse channels2 if exists;
+create dataverse channels2;
+use dataverse channels2;
create type TweetMessageTypeuuid as closed {
diff --git a/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
similarity index 91%
rename from src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
rename to asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
index 7cdec50..ed182ee 100644
--- a/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
@@ -4,9 +4,9 @@
* Date : Mar 2015
*/
-drop dataverse channels if exists;
-create dataverse channels;
-use dataverse channels;
+drop dataverse channels3 if exists;
+create dataverse channels3;
+use dataverse channels3;
create type TweetMessageTypeuuid as closed {
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
new file mode 100644
index 0000000..889af1f
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -0,0 +1,57 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
new file mode 100644
index 0000000..c5871f9
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -0,0 +1,71 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$5] |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
new file mode 100644
index 0000000..bdb8734
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -0,0 +1,71 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$53, $$1] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$53(ASC), $$1(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$53, $$1] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$40(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$40] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$45, $$47][$$41, $$42] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$45, $$47] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$41, $$42] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+-- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- MATERIALIZE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.ddl.aql
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..bd73c12
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
@@ -0,0 +1,5 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel
+order by $result.ChannelName
+return $result;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.2.update.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.3.query.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.update.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.5.sleep.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.6.update.aql
diff --git a/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.7.query.aql
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
similarity index 94%
rename from src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
index 41b036a..d84ea65 100644
--- a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
@@ -31,4 +31,6 @@
return $tweet.message-text
};
+create broker brokerA at "http://www.notifyA.com";
+
create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
new file mode 100644
index 0000000..8d7df53
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql
new file mode 100644
index 0000000..f3e19af
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("20.0, 20.0"), "Long") on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql
new file mode 100644
index 0000000..b426495
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.4.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("10.0, 10.0"), "Prosper") on brokerA;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.aql
similarity index 100%
rename from src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
rename to asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.aql
diff --git a/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
similarity index 100%
rename from src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
rename to asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
diff --git a/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
similarity index 100%
rename from src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
rename to asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
diff --git a/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
similarity index 100%
rename from src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
rename to asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
new file mode 100644
index 0000000..1da5787
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -0,0 +1,2 @@
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
similarity index 100%
rename from src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
rename to asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.3.adm
diff --git a/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
similarity index 100%
rename from src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
rename to asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.7.adm
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
new file mode 100644
index 0000000..d9268fb
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
@@ -0,0 +1,3 @@
+"Live"
+"Long"
+"Prosper"
\ No newline at end of file
diff --git a/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
similarity index 100%
rename from src/test/resources/runtimets/testsuite.xml
rename to asterix-bad/src/test/resources/runtimets/testsuite.xml
diff --git a/asterix-opt-bom/pom.xml b/asterix-opt-bom/pom.xml
new file mode 100644
index 0000000..9ee75d0
--- /dev/null
+++ b/asterix-opt-bom/pom.xml
@@ -0,0 +1,47 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.asterix.bad</groupId>
+ <artifactId>asterix-opt</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <!-- project coordinates -->
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-opt-bom</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>asterix-opt-bom</name>
+ <description>BAD Extension to AsterixDB</description>
+
+ <!-- any dependencies listed here will be included in -->
+ <!-- asterix-server, etc. binary assemblies -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix.bad</groupId>
+ <artifactId>asterix-bad</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pom.xml b/pom.xml
index 25cd9c3..ed105bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,196 +16,19 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.asterix</groupId>
- <artifactId>apache-asterixdb</artifactId>
- <version>0.8.9-SNAPSHOT</version>
- </parent>
- <artifactId>asterix-opt</artifactId>
- <properties>
- <asterix.version>0.8.9-SNAPSHOT</asterix.version>
- </properties>
- <build>
- <plugins>
- <plugin>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.asterix.bad</groupId>
+ <artifactId>asterix-opt</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <parent>
<groupId>org.apache.asterix</groupId>
- <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
- <version>${asterix.version}</version>
- <configuration>
- <base>${project.basedir}</base>
- <gbase>../asterix-lang-aql/src/main/javacc/AQL.jj</gbase>
- <gextension>src/main/resources/lang-extension/lang.txt</gextension>
- <output>target/generated-resources/javacc/grammar.jj</output>
- <parserClassName>BADAQLParser</parserClassName>
- <packageName>org.apache.asterix.bad.lang</packageName>
- </configuration>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>grammarix</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>javacc-maven-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>javacc</id>
- <goals>
- <goal>javacc</goal>
- </goals>
- <configuration>
- <isStatic>false</isStatic>
- <javaUnicodeEscape>true</javaUnicodeEscape>
- <sourceDirectory>target/generated-resources/javacc</sourceDirectory>
- </configuration>
- </execution>
- <execution>
- <id>javacc-jjdoc</id>
- <goals>
- <goal>jjdoc</goal>
- </goals>
- <phase>process-sources</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.build.directory}/generated-sources/javacc/</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
- <versionRange>[${asterix.version},)</versionRange>
- <goals>
- <goal>grammarix</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>javacc-maven-plugin</artifactId>
- <versionRange>[2.6,)</versionRange>
- <goals>
- <goal>javacc</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-om</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-test-support</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-runtime</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>algebricks-compiler</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-hdfs-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-test-framework</artifactId>
- <version>${asterix.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-active</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-algebra</artifactId>
- <version>${asterix.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-app</artifactId>
- <version>${asterix.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-app</artifactId>
- <version>${asterix.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>${asterix.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
+ <artifactId>apache-asterixdb</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ </parent>
+ <modules>
+ <module>asterix-bad</module>
+ <module>asterix-opt-bom</module>
+ </modules>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/asterix/bad/BADConstants.java b/src/main/java/org/apache/asterix/bad/BADConstants.java
deleted file mode 100644
index d03df33..0000000
--- a/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-public interface BADConstants {
- final String SubscriptionId = "subscriptionId";
- final String BrokerName = "BrokerName";
- final String ChannelName = "ChannelName";
- final String DataverseName = "DataverseName";
- final String BrokerEndPoint = "BrokerEndPoint";
- final String DeliveryTime = "deliveryTime";
- final String ResultId = "resultId";
- final String ChannelExecutionTime = "channelExecutionTime";
- final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
- final String ChannelResultsType = "ChannelResultsType";
- final String ResultsDatasetName = "ResultsDatasetName";
- final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
- final String CHANNEL_EXTENSION_NAME = "Channel";
- final String BROKER_KEYWORD = "Broker";
- final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
- final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
- final String subscriptionEnding = "Subscriptions";
- final String resultsEnding = "Results";
- final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
- final String BAD_DATAVERSE_NAME = "Metadata";
- final String Duration = "Duration";
- final String Function = "Function";
-
- public enum ChannelJobType {
- REPETITIVE
- }
-}
diff --git a/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/src/main/java/org/apache/asterix/bad/ChannelJobService.java
deleted file mode 100644
index 8310f70..0000000
--- a/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.json.JSONException;
-
-/**
- * Provides functionality for running channel jobs and communicating with Brokers
- */
-public class ChannelJobService {
-
- private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
- IHyracksClientConnection hcc;
-
- public ChannelJobService() throws AsterixException {
-
- }
-
- public void runChannelJob(JobSpecification channeljobSpec, String strIP, int port) throws Exception {
- hcc = new HyracksConnection(strIP, port);
- JobId jobId = hcc.startJob(channeljobSpec);
- hcc.waitForCompletion(jobId);
- }
-
- public void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
- AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
- String formattedString;
- try {
- formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
- } catch (JSONException e) {
- throw new HyracksDataException(e);
- }
- sendMessage(brokerEndpoint, formattedString);
- }
-
- public String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime)
- throws JSONException {
- String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
- + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
- + channelExecutionTime + "\", \"subscriptionIds\":[";
- for (int i = 0; i < subscriptionIds.size(); i++) {
- AUUID subId = (AUUID) subscriptionIds.getItem(i);
- String subString = subId.toSimpleString();
- JSON += "\"" + subString + "\"";
- if (i < subscriptionIds.size() - 1) {
- JSON += ",";
- }
- }
- JSON += "]}";
- return JSON;
-
- }
-
- public static void sendMessage(String targetURL, String urlParameters) {
- HttpURLConnection connection = null;
- try {
- //Create connection
- URL url = new URL(targetURL);
- connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
-
- connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
- connection.setRequestProperty("Content-Language", "en-US");
-
- connection.setUseCaches(false);
- connection.setDoOutput(true);
-
- //Send message
- try {
- DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
- wr.writeBytes(urlParameters);
- wr.close();
- } catch (Exception e) {
- throw new AsterixException("Broker connection failed to write", e);
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- int responseCode = connection.getResponseCode();
- LOGGER.info("\nSending 'POST' request to URL : " + url);
- LOGGER.info("Post parameters : " + urlParameters);
- LOGGER.info("Response Code : " + responseCode);
- }
-
- BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
- String inputLine;
- StringBuffer response = new StringBuffer();
-
- while ((inputLine = in.readLine()) != null) {
- response.append(inputLine);
- }
- in.close();
-
- if (LOGGER.isLoggable(Level.INFO)) {
- System.out.println(response.toString());
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (connection != null) {
- connection.disconnect();
- }
- }
- }
-
- @Override
- public String toString() {
- return "ChannelJobService";
- }
-
-}
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
deleted file mode 100644
index 1d4864f..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang;
-
-import java.util.List;
-
-import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.app.SessionConfig;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement;
-
-public class BADStatementExecutor extends QueryTranslator {
-
- public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
- ILangCompilationProvider compliationProvider, CompilerExtensionManager ccExtensionManager) {
- super(aqlStatements, conf, compliationProvider, ccExtensionManager);
- }
-
- /*
- @Override
- protected void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- super(metadataProvider, stmt, hcc);
- //TODO: need to drop channels and brokers
- //TODO: need to check if datasets or functions are in use by channels
- }*/
-
-}
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
deleted file mode 100644
index 873d2e7..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
-import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
-
- private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
- private final JobSpecification jobSpec;
- private long duration;
- private ChannelJobService channelJobService;
- private String strIP;
- private int port;
-
- public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
- JobSpecification channeljobSpec, String duration, String strIP, int port) throws AsterixException {
- super(ctx, runtimeId);
- this.jobSpec = channeljobSpec;
- this.duration = findPeriod(duration);
- //TODO: we should share channelJobService as a single instance
- //And only create one hcc
- channelJobService = new ChannelJobService();
- this.strIP = strIP;
- this.port = port;
- }
-
- public void executeJob() throws Exception {
- LOGGER.info("Executing Job: " + runtimeId.toString());
- channelJobService.runChannelJob(jobSpec, strIP, port);
- }
-
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
- throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void start() throws HyracksDataException, InterruptedException {
- scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- executeJob();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, duration, duration, TimeUnit.MILLISECONDS);
-
- while (!scheduledExecutorService.isTerminated()) {
-
- }
-
- }
-
- @Override
- protected void abort() throws HyracksDataException, InterruptedException {
- scheduledExecutorService.shutdown();
- }
-
- private long findPeriod(String duration) {
- //TODO: Allow Repetitive Channels to use YMD durations
- String hoursMinutesSeconds = "";
- if (duration.indexOf('T') != -1) {
- hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
- }
- double seconds = 0;
- if (hoursMinutesSeconds != "") {
- int pos = 0;
- if (hoursMinutesSeconds.indexOf('H') != -1) {
- Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
- seconds += (hours * 60 * 60);
- pos = hoursMinutesSeconds.indexOf('H') + 1;
-
- }
- if (hoursMinutesSeconds.indexOf('M') != -1) {
- Double minutes = Double
- .parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
- seconds += (minutes * 60);
- pos = hoursMinutesSeconds.indexOf('M') + 1;
- }
- if (hoursMinutesSeconds.indexOf('S') != -1) {
- Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
- seconds += (s);
- }
-
- }
- return (long) (seconds * 1000);
- }
-
-}
diff --git a/src/test/resources/optimizerts/results/channel/channel-create.plan b/src/test/resources/optimizerts/results/channel/channel-create.plan
deleted file mode 100644
index f597191..0000000
--- a/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ /dev/null
@@ -1,30 +0,0 @@
--- COMMIT |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$22] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- NESTED_LOOP |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
deleted file mode 100644
index 4530923..0000000
--- a/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ /dev/null
@@ -1,44 +0,0 @@
--- COMMIT |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$22] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- NESTED_LOOP |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
--- DISTRIBUTE_RESULT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- COMMIT |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$8] |PARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- STREAM_PROJECT |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- STREAM_PROJECT |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
deleted file mode 100644
index a9e383a..0000000
--- a/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ /dev/null
@@ -1,44 +0,0 @@
--- COMMIT |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$22] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- NESTED_LOOP |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
--- COMMIT |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- MATERIALIZE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- BTREE_SEARCH |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql b/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
deleted file mode 100644
index 9a1e170..0000000
--- a/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
+++ /dev/null
@@ -1,3 +0,0 @@
-use dataverse channels;
-
-for $result in dataset Metadata.Channel return $result;
\ No newline at end of file
diff --git a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql b/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
deleted file mode 100644
index 6d35506..0000000
--- a/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
+++ /dev/null
@@ -1,7 +0,0 @@
-use dataverse channels;
-
-subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live");
-
-subscribe to nearbyTweetChannel (point("20.0, 20.0"), "Long");
-
-subscribe to nearbyTweetChannel (point("10.0, 10.0"), "Prosper");
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
deleted file mode 100644
index 7307d37..0000000
--- a/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
+++ /dev/null
@@ -1,2 +0,0 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
-, { "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
\ No newline at end of file
diff --git a/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm b/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
deleted file mode 100644
index a2e74f1..0000000
--- a/src/test/resources/runtimets/results/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.adm
+++ /dev/null
@@ -1,3 +0,0 @@
-"Live"
-, "Long"
-, "Prosper"
\ No newline at end of file