[ASTERIXDB-2227][ING] Enabling filitering incoming data in feed
- user model changes: yes
Add syntax support for specifying predicate in connect feed
- storage format changes: no
- interface changes: no
Details:
In some use cases, a user may want to filter the incombing data with
certain attributes. One example can be only store the incoming tweets
with geo locations. This patch enables the <WHERE> clause in connect
feed statement. User can subset the incoming data using following
syntax:
connect feed feeds.TweetFeed to dataset Tweets3 using policy `Basic`
WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2255
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b64f828..aabb7c2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2181,6 +2181,7 @@
String feedName = cfs.getFeedName();
String datasetName = cfs.getDatasetName().getValue();
String policyName = cfs.getPolicy();
+ String whereClauseBody = cfs.getWhereClauseBody();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// TODO: Check whether we are connecting a change feed to a non-meta dataset
@@ -2213,7 +2214,7 @@
if (fc != null) {
throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName);
}
- fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName,
+ fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody,
outputType.getTypeName());
MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index b6371dc..424444a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -40,6 +40,7 @@
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.TxnId;
@@ -58,8 +59,11 @@
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -78,6 +82,7 @@
import org.apache.asterix.lang.sqlpp.clause.SelectElement;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -192,7 +197,7 @@
return argExprs;
}
- private static Query makeConnectionQuery(FeedConnection feedConnection) {
+ private static Query makeConnectionQuery(FeedConnection feedConnection) throws AlgebricksException {
// Construct from clause
VarIdentifier fromVarId = SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
@@ -204,6 +209,19 @@
CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+ WhereClause whereClause = null;
+ if (feedConnection.getWhereClauseBody().length() != 0) {
+ String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
+ IParserFactory sqlppParserFactory = new SqlppParserFactory();
+ IParser sqlppParser = sqlppParserFactory.createParser(whereClauseExpr);
+ List<Statement> stmts = sqlppParser.parse();
+ if (stmts.size() != 1) {
+ throw new CompilationException("Exceptions happened in processing where clause.");
+ }
+ Query whereClauseQuery = (Query) stmts.get(0);
+ whereClause = new WhereClause(whereClauseQuery.getBody());
+ }
+
// TODO: This can be the place to add select predicate for ingestion
// Attaching functions
int varIdx = 1;
@@ -222,7 +240,7 @@
// Constructing select clause
SelectElement selectElement = new SelectElement(previousVarExpr);
SelectClause selectClause = new SelectClause(selectElement, null, false);
- SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, null, null, null, null);
+ SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, whereClause, null, null, null);
SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
SelectExpression body = new SelectExpression(null, selectSetOperation, null, null, true);
Query query = new Query(false, true, body, 0);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
new file mode 100644
index 0000000..412ce03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use feeds;
+
+create type feeds.TweetType as
+ closed {
+ id : string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string
+};
+
+create dataset Tweets1(TweetType) primary key id;
+create dataset Tweets2(TweetType) primary key id;
+create dataset Tweets3(TweetType) primary key id;
+create dataset Tweets4(TweetType) primary key id;
+
+create feed TweetFeed with {
+ "adapter-name" : "localfs",
+ "path":"asterix_nc1://data/twitter/obamatweets.adm",
+ "format":"adm",
+ "type-name":"TweetType",
+ "tuple-interval":"10"
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
new file mode 100644
index 0000000..dd83b35
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+set `wait-for-completion-feed` `true`;
+connect feed feeds.TweetFeed to dataset Tweets1 using policy `Basic` WHERE id = 'nc1:115';
+connect feed feeds.TweetFeed to dataset Tweets2 using policy `Basic` WHERE id LIKE 'nc1:11%';
+connect feed feeds.TweetFeed to dataset Tweets3 using policy `Basic` WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
+connect feed feeds.TweetFeed to dataset Tweets4 using policy `Basic` WHERE id LIKE 'nc1:11%' AND username = 'thewildpitch';
+
+start feed feeds.TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
new file mode 100644
index 0000000..20c6320
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+
+select value x
+from Tweets1 as x
+union all
+select value x2
+from Tweets2 as x2
+union all
+select value x3
+from Tweets3 as x3
+union all
+select value x4
+from Tweets4 as x4
+order by id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
new file mode 100644
index 0000000..2c37364
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
@@ -0,0 +1,15 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index bb0d473..96dbf01 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8483,6 +8483,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="feeds_13">
+ <output-dir compare="Text">feeds_13</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="issue_230_feeds">
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 02d2220..74fe907 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1166,7 +1166,7 @@
<CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName()
(ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
{
- stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, getVarCounter());
+ stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, null, getVarCounter());
}
| <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> <DATASET> datasetNameComponents = QualifiedName()
{
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3b6a1c3..b0a3f6e 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -34,11 +34,12 @@
private final Identifier datasetName;
private final String feedName;
private final String policy;
+ private final String whereClauseBody;
private int varCounter;
private final List<FunctionSignature> appliedFunctions;
public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, Pair<Identifier, Identifier> datasetNameCmp,
- List<FunctionSignature> appliedFunctions, String policy, int varCounter) {
+ List<FunctionSignature> appliedFunctions, String policy, String whereClauseBody, int varCounter) {
if (feedNameCmp.first != null && datasetNameCmp.first != null
&& !feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
throw new IllegalArgumentException("Dataverse for source feed and target dataset do not match");
@@ -48,6 +49,7 @@
this.datasetName = datasetNameCmp.second;
this.feedName = feedNameCmp.second.getValue();
this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
+ this.whereClauseBody = whereClauseBody;
this.varCounter = varCounter;
this.appliedFunctions = appliedFunctions;
}
@@ -64,6 +66,10 @@
return varCounter;
}
+ public String getWhereClauseBody() {
+ return whereClauseBody;
+ }
+
@Override
public Kind getKind() {
return Statement.Kind.CONNECT_FEED;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 7a99814..42b8d15 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -157,6 +157,7 @@
import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
import org.apache.asterix.lang.sqlpp.optype.JoinType;
@@ -1264,14 +1265,37 @@
List<FunctionSignature> appliedFunctions = new ArrayList<FunctionSignature>();
Statement stmt = null;
String policy = null;
+ String whereClauseBody = null;
+ WhereClause whereClause = null;
+ Token beginPos = null;
+ Token endPos = null;
}
{
(
<FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName()
- (ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
+ (ApplyFunction(appliedFunctions))?
+ (policy = GetPolicy())?
+ (
+ <WHERE>
+ {
+ beginPos = token;
+ whereClause = new WhereClause();
+ Expression whereExpr;
+ }
+ whereExpr = Expression()
+ {
+ whereClause.setWhereExpr(whereExpr);
+ }
+ )?
+ {
+ if (whereClause != null) {
+ endPos = token;
+ whereClauseBody = extractFragment(beginPos.endLine, beginPos.endColumn, endPos.endLine, endPos.endColumn + 1);
+ }
+ }
{
stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions,
- policy, getVarCounter());
+ policy, whereClauseBody, getVarCounter());
}
)
{
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index a1eb425..367f568 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -242,7 +242,8 @@
}
public void dropFeedConnection(String dataverseName, String feedName, String datasetName) {
- FeedConnection feedConnection = new FeedConnection(dataverseName, feedName, datasetName, null, null, null);
+ FeedConnection feedConnection =
+ new FeedConnection(dataverseName, feedName, datasetName, null, null, null, null);
droppedCache.addFeedConnectionIfNotExists(feedConnection);
logAndApply(new MetadataLogicalOperation(feedConnection, false));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 54a69eb..ba1ea03 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -100,6 +100,7 @@
public static final String FIELD_NAME_VALUE = "Value";
public static final String FIELD_NAME_WORKING_MEMORY_SIZE = "WorkingMemorySize";
public static final String FIELD_NAME_APPLIED_FUNCTIONS = "AppliedFunctions";
+ public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause";
//---------------------------------- Record Types Creation ----------------------------------//
//--------------------------------------- Properties ----------------------------------------//
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 7572a9a..78d6e4e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -40,17 +40,19 @@
private String feedName;
private String datasetName;
private String policyName;
+ private String whereClauseBody;
private String outputType;
private List<FunctionSignature> appliedFunctions;
public FeedConnection(String dataverseName, String feedName, String datasetName,
- List<FunctionSignature> appliedFunctions, String policyName, String outputType) {
+ List<FunctionSignature> appliedFunctions, String policyName, String whereClauseBody, String outputType) {
this.dataverseName = dataverseName;
this.feedName = feedName;
this.datasetName = datasetName;
this.appliedFunctions = appliedFunctions;
this.connectionId = feedName + ":" + datasetName;
this.policyName = policyName;
+ this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody;
this.outputType = outputType;
this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName);
}
@@ -105,6 +107,10 @@
return policyName;
}
+ public String getWhereClauseBody() {
+ return whereClauseBody;
+ }
+
public String getOutputType() {
return outputType;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
index 269497b..61a8ab2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -22,6 +22,7 @@
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.util.ArrayList;
import java.util.List;
@@ -32,6 +33,7 @@
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AMissing;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.ARecord;
@@ -53,6 +55,8 @@
public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+ protected final transient ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
+
private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE);
@@ -101,7 +105,12 @@
}
}
- return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, outputType);
+ int whereClauseIdx = feedConnRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+ String whereClauseBody =
+ whereClauseIdx >= 0 ? ((AString) feedConnRecord.getValueByPos(whereClauseIdx)).getStringValue() : "";
+
+ return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody,
+ outputType);
}
@Override
@@ -159,6 +168,9 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, fieldValue);
+ // field: whereClauseBody
+ writeOpenPart(me);
+
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
@@ -166,6 +178,18 @@
return tuple;
}
+ protected void writeOpenPart(FeedConnection fc) throws HyracksDataException {
+ if (fc.getWhereClauseBody() != null && fc.getWhereClauseBody().length() > 0) {
+ fieldName.reset();
+ aString.setValue(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ aString.setValue(fc.getWhereClauseBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+ }
+
private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection fc, ArrayBackedValueStorage buffer)
throws HyracksDataException {
UnorderedListBuilder listBuilder = new UnorderedListBuilder();