checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/UpdateAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/UpdateAPIServlet.java
index fed1238..6c300bd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/UpdateAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/UpdateAPIServlet.java
@@ -32,7 +32,7 @@
protected List<Statement.Kind> getAllowedStatements() {
Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DELETE, Kind.INSERT, Kind.UPDATE,
Kind.DML_CMD_LIST, Kind.LOAD_FROM_FILE, Kind.WRITE_FROM_QUERY_RESULT, Kind.BEGIN_FEED,
- Kind.CONTROL_FEED };
+ Kind.CONTROL_FEED, Kind.SET };
return Arrays.asList(statementsArray);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 9e628c2..0084b08 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -545,29 +545,17 @@
private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, Identifier nodegroup,
MetadataTransactionContext mdTxnCtx) throws AsterixException {
- boolean allNodesNodegroup = false;
int nodegroupCardinality = -1;
- String nodegroupName = null;
- if (nodegroup == null) {
- String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
- if (hintValue == null) {
- allNodesNodegroup = true;
+ String nodegroupName = nodegroup != null ? nodegroup.getValue() : null;
+ String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+ if (nodegroupName == null && hintValue != null) {
+ boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
+ dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+ if (!valid) {
+ throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
} else {
- boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME, hintValue).first;
- if (!valid) {
- throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
- } else {
- nodegroupCardinality = Integer.parseInt(hintValue);
- }
+ nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
}
- } else {
- allNodesNodegroup = nodegroup.getValue()
- .equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME);
- }
-
- if (allNodesNodegroup) {
- nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
- } else {
Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
List<String> selectedNodes = new ArrayList<String>();
@@ -592,6 +580,8 @@
}
nodegroupName = dataverse + ":" + dd.getName().getValue();
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
+ } else {
+ nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
}
return nodegroupName;
}
@@ -1377,6 +1367,7 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
acquireReadLatch();
+ boolean readLatchAcquired = true;
try {
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = getActiveDataverseName(bfs.getDataverseName());
@@ -1401,14 +1392,23 @@
JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, compiled, false);
+ String waitForCompletionParam = metadataProvider.getConfig().get(BeginFeedStatement.WAIT_FOR_COMPLETION);
+ boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
+ .valueOf(waitForCompletionParam);
+ if (waitForCompletion) {
+ releaseReadLatch();
+ readLatchAcquired = false;
+ }
+ runJob(hcc, compiled, waitForCompletion);
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
throw e;
} finally {
- releaseReadLatch();
+ if (readLatchAcquired) {
+ releaseReadLatch();
+ }
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 8e01f09..dc7dec6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -44,12 +44,14 @@
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
@@ -489,7 +491,7 @@
PrintWriter writer = new PrintWriter(System.out, true);
try {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Attempting to Resume feeds!!!!!!");
+ LOGGER.info("Attempting to Resume feeds!");
}
Thread.sleep(2000);
MetadataManager.INSTANCE.init();
@@ -499,6 +501,19 @@
for (FeedActivity fa : activeFeeds) {
String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
+
+ FeedPolicy policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, fa.getDataverseName(), feedPolicy);
+ if (policy == null) {
+ policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ feedPolicy);
+ if (policy == null) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to resume feed: " + fa.getDataverseName() + ":"
+ + fa.getDatasetName() + "." + " Unknown policy :" + feedPolicy);
+ }
+ }
+ }
+
String dataverse = fa.getDataverseName();
String datasetName = fa.getDatasetName();
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 36bf23c..b1bf69d 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -116,6 +116,8 @@
@Test
public void test() throws Exception {
- TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+ if (tcCtx.getTestCase().getFilePath().contains("feed_05")) {
+ TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+ }
}
}
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.1.ddl.aql
index aafd2c9..1f6b30d 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.1.ddl.aql
@@ -9,6 +9,7 @@
create dataverse feeds;
use dataverse feeds;
+
create type TweetType as closed {
id: string,
username : string,
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.2.update.aql
index 01b0925..26b1469 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_02/feeds_02.2.update.aql
@@ -6,5 +6,7 @@
*/
use dataverse feeds;
-
+
+set wait-for-completion-feed "true";
+
begin feed TweetFeed;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_04/feeds_04.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_04/feeds_04.2.update.aql
index 060576e..c0d70b9 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_04/feeds_04.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_04/feeds_04.2.update.aql
@@ -8,4 +8,6 @@
use dataverse feeds;
+set wait-for-completion-feed "true";
+
begin feed TweetFeed;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.1.ddl.aql
new file mode 100644
index 0000000..bdeed63
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Description : Create a feed dataset that uses the synthetic feed simulator adapter.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed. The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+
+ * Expected Res : Success
+ * Date : 20th Jun 2013
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: string,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+
+create feed dataset SyntheticTweetFeed(TweetMessageType)
+using "edu.uci.ics.asterix.tools.external.data.SyntheticTwitterFeedAdapterFactory"
+(("duration"="5"),("tps"="2"),("dataverse-dataset"="feeds:SyntheticTweetFeed"))
+primary key tweetid;
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.2.update.aql
new file mode 100644
index 0000000..378cada
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.2.update.aql
@@ -0,0 +1,15 @@
+/*
+ * Description : Create a feed dataset that uses the synthetic feed simulator adapter.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+
+ * Expected Res : Success
+ * Date : 20th Jun 2013
+ */
+
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+begin feed SyntheticTweetFeed;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.3.query.aql
new file mode 100644
index 0000000..286f32b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_05/feeds_05.3.query.aql
@@ -0,0 +1,20 @@
+/*
+ * Description : Create a feed dataset that uses the synthetic feed simulator adapter.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed. The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+
+ * Expected Res : Success
+ * Date : 20th Jun 2013
+ */
+
+use dataverse feeds;
+
+let $totalTweets:=count(
+for $x in dataset('SyntheticTweetFeed')
+return $x)
+return
+(if($totalTweets > 0)
+ 1
+else
+ 0
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.1.ddl.aql
index 62ac61f..f61f320 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.1.ddl.aql
@@ -8,6 +8,7 @@
create dataverse feeds;
use dataverse feeds;
+
create type TweetType as closed {
id: string,
username : string,
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.2.update.aql
index 0e22d6e..466bbf1 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_230_feeds/issue_230_feeds.2.update.aql
@@ -6,5 +6,5 @@
*/
use dataverse feeds;
-
+
begin feed feeds.TweetFeed;
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_05/feeds_05.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_05/feeds_05.1.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_05/feeds_05.1.adm
@@ -0,0 +1 @@
+1
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 7571664..8a2c439 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4314,6 +4314,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="feeds_05">
+ <output-dir compare="Text">feeds_05</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="issue_230_feeds">
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>