Introduces Feeds 2.0
commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Fri Jun 26 13:04:05 2015 -0700
Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release
Conflicts:
asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
index abac9df..b9c96d1 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
@@ -37,10 +37,14 @@
INDEX_DECL,
CREATE_DATAVERSE,
INDEX_DROP,
- CREATE_FEED,
+ CREATE_PRIMARY_FEED,
+ CREATE_SECONDARY_FEED,
DROP_FEED,
CONNECT_FEED,
DISCONNECT_FEED,
+ SUBSCRIBE_FEED,
+ CREATE_FEED_POLICY,
+ DROP_FEED_POLICY,
CREATE_FUNCTION,
FUNCTION_DROP,
COMPACT,
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 25acff8..5d9794e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,9 +32,10 @@
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -75,6 +76,7 @@
this.varCounter = varCounter;
}
+ /*
public void initialize(MetadataTransactionContext mdTxnCtx, Dataset targetDataset, Feed sourceFeed)
throws MetadataException {
query = new Query();
@@ -91,9 +93,9 @@
}
}
- Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+ Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
try {
- factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+ factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) sourceFeed, mdTxnCtx);
adapterOutputType = factoryOutput.second.getTypeName();
} catch (AlgebricksException ae) {
ae.printStackTrace();
@@ -135,7 +137,7 @@
throw new MetadataException(pe);
}
- }
+ }*/
public Identifier getDataverseName() {
return dataverseName;
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
new file mode 100644
index 0000000..939d777
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class CreateFeedPolicyStatement implements Statement {
+
+ private final String policyName;
+ private final String sourcePolicyName;
+ private final Map<String, String> properties;
+ private final String sourcePolicyFile;
+ private final String description;
+ private final boolean ifNotExists;
+
+ public CreateFeedPolicyStatement(String policyName, String sourcePolicyName, Map<String, String> properties,
+ String description, boolean ifNotExists) {
+ this.policyName = policyName;
+ this.sourcePolicyName = sourcePolicyName;
+ this.properties = properties;
+ this.description = description;
+ this.ifNotExists = ifNotExists;
+ sourcePolicyFile = null;
+ }
+
+ public CreateFeedPolicyStatement(String policyName, String sourcePolicyFile, String description, boolean ifNotExists) {
+ this.policyName = policyName;
+ this.sourcePolicyName = null;
+ this.sourcePolicyFile = sourcePolicyFile;
+ this.description = description;
+ this.properties = null;
+ this.ifNotExists = ifNotExists;
+ }
+
+ public boolean getIfNotExists() {
+ return this.ifNotExists;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Statement.Kind.CREATE_FEED_POLICY;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return visitor.visitCreateFeedPolicyStatement(this, arg);
+ }
+
+ @Override
+ public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+ visitor.visit(this, arg);
+ }
+
+ public String getPolicyName() {
+ return policyName;
+ }
+
+ public String getSourcePolicyName() {
+ return sourcePolicyName;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public String getSourcePolicyFile() {
+ return sourcePolicyFile;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
index 4e33f73..e189fd4 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
@@ -14,47 +14,32 @@
*/
package edu.uci.ics.asterix.aql.expression;
-import java.util.Map;
-
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-public class CreateFeedStatement implements Statement {
+public abstract class CreateFeedStatement implements Statement {
- private final Identifier dataverseName;
- private final Identifier feedName;
- private final String adapterName;
- private final Map<String, String> adapterConfiguration;
+ private final Pair<Identifier, Identifier> qName;
private final FunctionSignature appliedFunction;
private final boolean ifNotExists;
- public CreateFeedStatement(Identifier dataverseName, Identifier feedName, String adapterName,
- Map<String, String> adapterConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
- this.feedName = feedName;
- this.dataverseName = dataverseName;
- this.adapterName = adapterName;
- this.adapterConfiguration = adapterConfiguration;
+ public CreateFeedStatement(Pair<Identifier, Identifier> qName, FunctionSignature appliedFunction,
+ boolean ifNotExists) {
+ this.qName = qName;
this.appliedFunction = appliedFunction;
this.ifNotExists = ifNotExists;
}
public Identifier getDataverseName() {
- return dataverseName;
+ return qName.first;
}
public Identifier getFeedName() {
- return feedName;
- }
-
- public String getAdapterName() {
- return adapterName;
- }
-
- public Map<String, String> getAdapterConfiguration() {
- return adapterConfiguration;
+ return qName.second;
}
public FunctionSignature getAppliedFunction() {
@@ -66,14 +51,10 @@
}
@Override
- public Kind getKind() {
- return Kind.CREATE_FEED;
- }
+ public abstract Kind getKind();
@Override
- public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
- return visitor.visitCreateFeedStatement(this, arg);
- }
+ public abstract <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException;
@Override
public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
new file mode 100644
index 0000000..810e508
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+public class CreatePrimaryFeedStatement extends CreateFeedStatement implements Statement {
+
+ private final String adaptorName;
+ private final Map<String, String> adaptorConfiguration;
+
+ public CreatePrimaryFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName,
+ Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
+ super(qName, appliedFunction, ifNotExists);
+ this.adaptorName = adaptorName;
+ this.adaptorConfiguration = adaptorConfiguration;
+ }
+
+ public String getAdaptorName() {
+ return adaptorName;
+ }
+
+ public Map<String, String> getAdaptorConfiguration() {
+ return adaptorConfiguration;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.CREATE_PRIMARY_FEED;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return visitor.visitCreatePrimaryFeedStatement(this, arg);
+ }
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
new file mode 100644
index 0000000..7d5f72a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+/**
+ * Represents the AQL statement for creating a secondary feed.
+ * A secondary feed is one that derives its data from another (primary/secondary) feed.
+ */
+public class CreateSecondaryFeedStatement extends CreateFeedStatement implements Statement {
+
+ /** The source feed that provides data for this secondary feed. */
+ private final Pair<Identifier, Identifier> sourceQName;
+
+ public CreateSecondaryFeedStatement(Pair<Identifier, Identifier> qName, Pair<Identifier, Identifier> sourceQName,
+ FunctionSignature appliedFunction, boolean ifNotExists) {
+ super(qName, appliedFunction, ifNotExists);
+ this.sourceQName = sourceQName;
+ }
+
+ public String getSourceFeedDataverse() {
+ return sourceQName.first != null ? sourceQName.first.toString()
+ : getDataverseName() != null ? getDataverseName().getValue() : null;
+ }
+
+ public String getSourceFeedName() {
+ return sourceQName.second != null ? sourceQName.second.toString() : null;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.CREATE_SECONDARY_FEED;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return visitor.visitCreateSecondaryFeedStatement(this, arg);
+ }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
new file mode 100644
index 0000000..a9e9ee1
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class FeedPolicyDropStatement implements Statement {
+
+ private final Identifier dataverseName;
+ private final Identifier policyName;
+ private boolean ifExists;
+
+ public FeedPolicyDropStatement(Identifier dataverseName, Identifier policyName, boolean ifExists) {
+ this.dataverseName = dataverseName;
+ this.policyName = policyName;
+ this.ifExists = ifExists;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.DROP_FEED_POLICY;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getPolicyName() {
+ return policyName;
+ }
+
+ public boolean getIfExists() {
+ return ifExists;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return visitor.visitDropFeedPolicyStatement(this, arg);
+ }
+
+ @Override
+ public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+ visitor.visit(this, arg);
+ }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
new file mode 100644
index 0000000..509a69a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+
+/**
+ * Represents the AQL statement for subscribing to a feed.
+ * This AQL statement is private and may not be used by the end-user.
+ */
+public class SubscribeFeedStatement implements Statement {
+
+ private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
+ private final FeedConnectionRequest connectionRequest;
+ private Query query;
+ private int varCounter;
+ private final String[] locations;
+
+ public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+
+ public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) {
+ this.connectionRequest = subscriptionRequest;
+ this.varCounter = 0;
+ this.locations = locations;
+ }
+
+ public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+ this.query = new Query();
+ FeedId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+ Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId()
+ .getDataverse(), connectionRequest.getReceivingFeedId().getFeedName());
+ if (subscriberFeed == null) {
+ throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
+ }
+
+ String feedOutputType = getOutputType(mdTxnCtx);
+ FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction();
+ Function function = null;
+ if (appliedFunction != null) {
+ function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+ if (function == null) {
+ throw new MetadataException(" Unknown function " + function);
+ } else if (function.getParams().size() > 1) {
+ throw new MetadataException(" Incompatible function: " + appliedFunction
+ + " Number if arguments must be 1");
+ }
+ }
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
+ builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
+ builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'"
+ + connectionRequest.getPolicy() + "'" + ";\n");
+
+ builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
+ builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
+ + sourceFeedId.getFeedName() + "'" + "," + "'" + connectionRequest.getReceivingFeedId().getFeedName()
+ + "'" + "," + "'" + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+ + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
+
+ List<String> functionsToApply = connectionRequest.getFunctionsToApply();
+ if (functionsToApply != null && functionsToApply.isEmpty()) {
+ builder.append(" return $x");
+ } else {
+ String rValueName = "x";
+ String lValueName = "y";
+ int variableIndex = 0;
+ for (String functionName : functionsToApply) {
+ function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+ variableIndex++;
+ switch (function.getLanguage().toUpperCase()) {
+ case Function.LANGUAGE_AQL:
+ builder.append(" let " + "$" + lValueName + variableIndex + ":=(" + function.getFunctionBody()
+ + ")");
+ builder.append("\n");
+ break;
+ case Function.LANGUAGE_JAVA:
+ builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$"
+ + rValueName + ")");
+ rValueName = lValueName + variableIndex;
+ break;
+ }
+ builder.append("\n");
+ }
+ builder.append("return $" + lValueName + variableIndex);
+ }
+ builder.append(")");
+ builder.append(";");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connect feed statement translated to\n" + builder.toString());
+ }
+ AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+
+ List<Statement> statements;
+ try {
+ statements = parser.Statement();
+ query = ((InsertStatement) statements.get(3)).getQuery();
+ } catch (ParseException pe) {
+ throw new MetadataException(pe);
+ }
+
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.SUBSCRIBE_FEED;
+ }
+
+ public String getPolicy() {
+ return connectionRequest.getPolicy();
+ }
+
+ public FeedConnectionRequest getSubscriptionRequest() {
+ return connectionRequest;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+ }
+
+ public String getDataverseName() {
+ return connectionRequest.getReceivingFeedId().getDataverse();
+ }
+
+ private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+ String outputType = null;
+ FeedId feedId = connectionRequest.getReceivingFeedId();
+ Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName());
+ FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
+ try {
+ switch (feed.getFeedType()) {
+ case PRIMARY:
+ Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+
+ factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
+ mdTxnCtx);
+ outputType = factoryOutput.second.getTypeName();
+ break;
+ case SECONDARY:
+ outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+ break;
+ }
+ return outputType;
+
+ } catch (AlgebricksException ae) {
+ throw new MetadataException(ae);
+ }
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
index 21a2388..28acfda 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.aql.expression.CompactStatement;
import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -37,6 +38,7 @@
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.FieldBinding;
import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -61,13 +63,13 @@
import edu.uci.ics.asterix.aql.expression.OrderbyClause;
import edu.uci.ics.asterix.aql.expression.OrderbyClause.OrderModifier;
import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
import edu.uci.ics.asterix.aql.expression.QuantifiedPair;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RecordConstructor;
import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition.RecordKind;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.SetStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -557,6 +559,15 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public void visit(CreateFeedPolicyStatement stmt, Integer arg) throws AsterixException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void visit(FeedPolicyDropStatement stmt, Integer arg) throws AsterixException {
+ }
@Override
public void visit(RunStatement stmt, Integer arg) throws AsterixException {
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
index 84d9726..6716fff 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
@@ -15,21 +15,24 @@
package edu.uci.ics.asterix.aql.expression.visitor;
import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DistinctClause;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.ForClause;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -157,9 +160,15 @@
R visitConnectFeedStatement(ConnectFeedStatement del, T arg) throws AsterixException;
- R visitCreateFeedStatement(CreateFeedStatement del, T arg) throws AsterixException;
+ R visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement cpfs, T arg) throws AsterixException;
+
+ R visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement csfs, T arg) throws AsterixException;
R visitDropFeedStatement(FeedDropStatement del, T arg) throws AsterixException;
+
+ R visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, T arg) throws AsterixException;
+
+ R visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, T arg) throws AsterixException;
R visitCallExpr(CallExpr pf, T arg) throws AsterixException;
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
index f04c3a0..84e0825 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
@@ -18,6 +18,7 @@
import edu.uci.ics.asterix.aql.expression.CompactStatement;
import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -30,6 +31,7 @@
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.ForClause;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -49,11 +51,11 @@
import edu.uci.ics.asterix.aql.expression.OperatorExpr;
import edu.uci.ics.asterix.aql.expression.OrderbyClause;
import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RecordConstructor;
import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.SetStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -164,8 +166,12 @@
void visit(CreateFeedStatement stmt, T arg) throws AsterixException;
+ void visit(CreateFeedPolicyStatement stmt, T arg) throws AsterixException;
+
void visit(FeedDropStatement stmt, T arg) throws AsterixException;
+ void visit(FeedPolicyDropStatement stmt, T arg) throws AsterixException;
+
void visit(CreateFunctionStatement cfs, T arg) throws AsterixException;
void visit(FunctionDropStatement fds, T arg) throws AsterixException;
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
index c416fa6..6b1df98 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
@@ -25,21 +25,24 @@
import edu.uci.ics.asterix.aql.base.Expression;
import edu.uci.ics.asterix.aql.base.Expression.Kind;
import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DistinctClause;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.FieldBinding;
import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -89,8 +92,6 @@
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.AsterixFunction;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
public final class AqlRewriter {
@@ -553,8 +554,13 @@
}
@Override
- public Void visitCreateFeedStatement(CreateFeedStatement del, Void arg) throws AsterixException {
- // TODO Auto-generated method stub
+ public Void visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, Void arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public Void visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, Void arg)
+ throws AsterixException {
return null;
}
@@ -575,6 +581,18 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public Void visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, Void arg) throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Void visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, Void arg) throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
index edb080c..f03799a 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
@@ -22,21 +22,24 @@
import edu.uci.ics.asterix.aql.base.Expression;
import edu.uci.ics.asterix.aql.base.IAqlExpression;
import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DistinctClause;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.FieldBinding;
import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -588,8 +591,15 @@
}
@Override
- public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedStatement(CreateFeedStatement del,
- List<VariableSubstitution> arg) throws AsterixException {
+ public Pair<IAqlExpression, List<VariableSubstitution>> visitCreatePrimaryFeedStatement(
+ CreatePrimaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateSecondaryFeedStatement(
+ CreateSecondaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
// TODO Auto-generated method stub
return null;
}
@@ -614,4 +624,17 @@
// TODO Auto-generated method stub
return null;
}
+
+
+ @Override
+ public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedPolicyStatement(
+ CreateFeedPolicyStatement cfps, List<VariableSubstitution> arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public Pair<IAqlExpression, List<VariableSubstitution>> visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs,
+ List<VariableSubstitution> arg) throws AsterixException {
+ return null;
+ }
}
\ No newline at end of file
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
index 58433d3..394b507 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
@@ -23,21 +23,24 @@
import edu.uci.ics.asterix.aql.base.Expression.Kind;
import edu.uci.ics.asterix.aql.base.IAqlExpression;
import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DistinctClause;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.FieldBinding;
import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -522,7 +525,15 @@
}
@Override
- public Boolean visitCreateFeedStatement(CreateFeedStatement del, List<FunctionDecl> arg) throws AsterixException {
+ public Boolean visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, List<FunctionDecl> arg)
+ throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Boolean visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, List<FunctionDecl> arg)
+ throws AsterixException {
// TODO Auto-generated method stub
return null;
}
@@ -544,4 +555,18 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public Boolean visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, List<FunctionDecl> arg)
+ throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Boolean visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, List<FunctionDecl> arg)
+ throws AsterixException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 8150871..7044962 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -37,6 +37,9 @@
import edu.uci.ics.asterix.aql.expression.CallExpr;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -336,6 +339,7 @@
| stmt = DataverseSpecification()
| stmt = FunctionSpecification()
| stmt = FeedSpecification()
+ | stmt = FeedPolicySpecification()
)
{
return stmt;
@@ -653,24 +657,61 @@
Map<String,String> properties = null;
FunctionSignature appliedFunction = null;
CreateFeedStatement cfs = null;
+ Pair<Identifier,Identifier> sourceNameComponents = null;
+
}
{
(
- "feed" nameComponents = QualifiedName()
- ifNotExists = IfNotExists()
- "using" adapterName = AdapterName() properties = Configuration()
- (appliedFunction = ApplyFunction())?
+ "secondary" "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists()
+ <FROM> "feed" sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())?
{
- cfs = new CreateFeedStatement(nameComponents.first,
- nameComponents.second, adapterName, properties, appliedFunction, ifNotExists);
+ cfs = new CreateSecondaryFeedStatement(nameComponents,
+ sourceNameComponents, appliedFunction, ifNotExists);
}
-
+ |
+ ("primary")? "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists()
+ "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?
+ {
+ cfs = new CreatePrimaryFeedStatement(nameComponents,
+ adapterName, properties, appliedFunction, ifNotExists);
+ }
)
{
return cfs;
}
}
+CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException:
+{
+ String policyName = null;
+ String basePolicyName = null;
+ String sourcePolicyFile = null;
+ String definition = null;
+ boolean ifNotExists = false;
+ Map<String,String> properties = null;
+ CreateFeedPolicyStatement cfps = null;
+}
+{
+ (
+ "ingestion" "policy" policyName = Identifier() ifNotExists = IfNotExists()
+ <FROM>
+ ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())?
+ {
+ cfps = new CreateFeedPolicyStatement(policyName,
+ basePolicyName, properties, definition, ifNotExists);
+ }
+ | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())?
+ {
+ cfps = new CreateFeedPolicyStatement(policyName, sourcePolicyFile, definition, ifNotExists);
+ }
+ )
+
+ )
+ {
+ return cfps;
+ }
+}
+
List<VarIdentifier> ParameterList() throws ParseException: