added test case for inverted index + minor change to the c'tor (putting indexName as the last arg)
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
new file mode 100644
index 0000000..7758f28
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
@@ -0,0 +1,41 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated ngram index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+create dataset SyntheticTweets(TweetMessageType)
+primary key tweetid;
+
+create index ngram_index on SyntheticTweets(message-text) type ngram(3);
+
+create feed SyntheticTweetFeed
+using twitter_firehose
+(("duration"="5"),("tps"="50"),("tput-duration"="5"),("dataverse-dataset"="feeds:SyntheticTweets"),("mode"="controlled"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.2.update.aql
new file mode 100644
index 0000000..51f2623
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.2.update.aql
@@ -0,0 +1,16 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated ngram index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+connect feed SyntheticTweetFeed to dataset SyntheticTweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.3.query.aql
new file mode 100644
index 0000000..e07b63e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.3.query.aql
@@ -0,0 +1,22 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated ngram index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+use dataverse feeds;
+
+let $totalTweets:=count(
+for $x in dataset('SyntheticTweets')
+return $x)
+return
+(if($totalTweets > 0)
+ then 1
+else
+ 0
+)
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_07/feeds_07.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_07/feeds_07.1.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_07/feeds_07.1.adm
@@ -0,0 +1 @@
+1
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_08/feeds_08.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_08/feeds_08.1.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_08/feeds_08.1.adm
@@ -0,0 +1 @@
+1
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 58c0d38..615ae17 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4451,6 +4451,16 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="feeds_07">
+ <output-dir compare="Text">feeds_07</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_08">
+ <output-dir compare="Text">feeds_08</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="issue_230_feeds">
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index b18b90a..45d6d9e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -34,23 +34,23 @@
private static final long serialVersionUID = 1L;
+ private final boolean isPrimary;
+
/** the name of the index that is being operated upon **/
private final String indexName;
- private final boolean isPrimary;
-
- public AsterixLSMTreeInsertDeleteOperatorDescriptor(String indexName, IOperatorDescriptorRegistry spec,
+ public AsterixLSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor recDesc, IStorageManagerInterface storageManager,
IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
- IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary) {
+ IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary, String indexName) {
super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
tupleFilterFactory, modificationOpCallbackProvider);
- this.indexName = indexName;
this.isPrimary = isPrimary;
+ this.indexName = indexName;
}
@Override
@@ -60,14 +60,14 @@
recordDescProvider, op, isPrimary);
}
- public String getIndexName() {
- return indexName;
- }
-
public boolean isPrimary() {
return isPrimary;
}
+ public String getIndexName() {
+ return indexName;
+ }
+
public int[] getFieldPermutations() {
return fieldPermutation;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 63aa5db..972415a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -862,14 +862,15 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
+ spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
- .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
+ .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true,
+ indexName);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
@@ -1057,7 +1058,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor btreeBulkLoad = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
+ spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
@@ -1065,7 +1066,7 @@
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
- false);
+ false, indexName);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1287,7 +1288,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor rtreeUpdate = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
+ spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
@@ -1296,7 +1297,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
- filterFactory, modificationCallbackFactory, false);
+ filterFactory, modificationCallbackFactory, false, indexName);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index d895d5c..94185a4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -62,11 +62,11 @@
*/
private final FeedRuntimeType runtimeType;
- private final String operationId;
+ private final String operandId;
public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType,
- String operationId) {
+ String operandId) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicy = feedPolicy;
@@ -75,14 +75,14 @@
}
this.coreOperator = coreOperatorDescriptor;
this.runtimeType = runtimeType;
- this.operationId = operationId;
+ this.operandId = operandId;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
- feedConnectionId, feedPolicy, runtimeType, operationId);
+ feedConnectionId, feedPolicy, runtimeType, operandId);
}
@Override