Merge branch 'master' into raman/master_feeds_711
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
new file mode 100644
index 0000000..d84c49b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
@@ -0,0 +1,41 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated rtree index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+create dataset SyntheticTweets(TweetMessageType)
+primary key tweetid;
+
+create index locationIdx on SyntheticTweets(sender-location) type rtree;
+
+create feed SyntheticTweetFeed
+using twitter_firehose
+(("duration"="5"),("tps"="50"),("tput-duration"="5"),("dataverse-dataset"="feeds:SyntheticTweets"),("mode"="controlled"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql
new file mode 100644
index 0000000..c52867a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql
@@ -0,0 +1,16 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated rtree index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+connect feed SyntheticTweetFeed to dataset SyntheticTweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql
new file mode 100644
index 0000000..9dcafda
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql
@@ -0,0 +1,22 @@
+/*
+ * Description : Create a feed using the synthetic feed simulator adapter.
+ Create a dataset that has an associated rtree index.
+ The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+ The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+ Verify the existence of data after the feed finishes.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 8th Feb 2014
+ */
+
+use dataverse feeds;
+
+let $totalTweets:=count(
+for $x in dataset('SyntheticTweets')
+return $x)
+return
+(if($totalTweets > 0)
+ then 1
+else
+ 0
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql
new file mode 100644
index 0000000..42a16d7
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql
@@ -0,0 +1,30 @@
+/*
+ * Description : Create a dataset with a secondary btree index.
+ Create a feed that uses the file_feed adaptor.
+ The file_feed adaptor simulates a feed from a file in the HDFS.
+ Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 6th Feb 2014
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TweetType as closed {
+ id: string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string
+}
+
+create dataset Tweets(TweetType)
+primary key id;
+
+create index usernameIdx on Tweets(username) type btree;
+
+create feed TweetFeed
+using file_feed
+(("fs"="hdfs"),("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/obamatweets.adm"),("format"="adm"),("input-format"="text-input-format"),("type-name"="TweetType"),("tuple-interval"="10"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql
new file mode 100644
index 0000000..cd9a45e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql
@@ -0,0 +1,15 @@
+/*
+ * Description : Create a dataset with a secondary btree index.
+ Create a feed that uses the file_feed adaptor.
+ The file_feed adaptor simulates a feed from a file in the HDFS.
+ Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 6th Feb 2014
+ */
+
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql
new file mode 100644
index 0000000..62dfb82
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a dataset with a secondary btree index.
+ Create a feed that uses the file_feed adaptor.
+ The file_feed adaptor simulates a feed from a file in the HDFS.
+ Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue : 711
+ * Expected Res : Success
+ * Date : 6th Feb 2014
+ */
+use dataverse feeds;
+
+for $x in dataset('Tweets')
+order by $x.id
+return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm
new file mode 100644
index 0000000..2567483
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm
@@ -0,0 +1,11 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+{ "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+{ "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
+{ "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+{ "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 067fddb..58c0d38 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4455,6 +4455,11 @@
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="issue_711_feeds">
+ <output-dir compare="Text">issue_711_feeds</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="hdfs">
<test-case FilePath="hdfs">
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 43afd27..57479f3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -35,16 +35,19 @@
private static final long serialVersionUID = 1L;
+ private final String indexName;
+
public AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor recDesc, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, String indexName) {
super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory);
+ this.indexName = indexName;
}
@Override
@@ -53,4 +56,8 @@
return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, op, false);
}
+
+ public String getIndexName() {
+ return indexName;
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index 6f6a7b2..b18b90a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -34,18 +34,22 @@
private static final long serialVersionUID = 1L;
+ /** the name of the index that is being operated upon **/
+ private final String indexName;
+
private final boolean isPrimary;
- public AsterixLSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
- IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
- IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
- IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ public AsterixLSMTreeInsertDeleteOperatorDescriptor(String indexName, IOperatorDescriptorRegistry spec,
+ RecordDescriptor recDesc, IStorageManagerInterface storageManager,
+ IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider,
+ ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+ int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary) {
super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
tupleFilterFactory, modificationOpCallbackProvider);
+ this.indexName = indexName;
this.isPrimary = isPrimary;
}
@@ -56,6 +60,10 @@
recordDescProvider, op, isPrimary);
}
+ public String getIndexName() {
+ return indexName;
+ }
+
public boolean isPrimary() {
return isPrimary;
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
index d1b2faf..88e1db5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
@@ -34,12 +34,16 @@
protected FeedRuntimeState runtimeState;
public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType) {
- this.feedRuntimeId = new FeedRuntimeId(feedRuntimeType, feedId, partition);
+ this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, partition);
}
- public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType,
+ public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId) {
+ this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
+ }
+
+ public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId,
FeedRuntimeState runtimeState) {
- this.feedRuntimeId = new FeedRuntimeId(feedRuntimeType, feedId, partition);
+ this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
this.runtimeState = runtimeState;
}
@@ -88,13 +92,24 @@
public static class FeedRuntimeId {
+ public static final String DEFAULT_OPERATION_ID = "N/A";
private final FeedRuntimeType feedRuntimeType;
+ private final String operandId;
private final FeedConnectionId feedId;
private final int partition;
private final int hashCode;
- public FeedRuntimeId(FeedRuntimeType runtimeType, FeedConnectionId feedId, int partition) {
+ public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, String operandId, int partition) {
this.feedRuntimeType = runtimeType;
+ this.operandId = operandId;
+ this.feedId = feedId;
+ this.partition = partition;
+ this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
+ }
+
+ public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, int partition) {
+ this.feedRuntimeType = runtimeType;
+ this.operandId = DEFAULT_OPERATION_ID;
this.feedId = feedId;
this.partition = partition;
this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
@@ -102,7 +117,7 @@
@Override
public String toString() {
- return feedId + "[" + partition + "]" + " " + feedRuntimeType;
+ return feedId + "[" + partition + "]" + " " + feedRuntimeType + "(" + operandId + ")";
}
@Override
@@ -115,7 +130,7 @@
if (o instanceof FeedRuntimeId) {
FeedRuntimeId oid = ((FeedRuntimeId) o);
return oid.getFeedId().equals(feedId) && oid.getFeedRuntimeType().equals(feedRuntimeType)
- && oid.getPartition() == partition;
+ && oid.getOperandId().equals(operandId) && oid.getPartition() == partition;
}
return false;
}
@@ -128,6 +143,10 @@
return feedId;
}
+ public String getOperandId() {
+ return operandId;
+ }
+
public int getPartition() {
return partition;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1fb64be..63aa5db 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -862,7 +862,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
+ indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
@@ -1057,7 +1057,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor btreeBulkLoad = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
+ indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
@@ -1204,7 +1204,7 @@
spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory, modificationCallbackFactory);
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1287,7 +1287,7 @@
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
AsterixLSMTreeInsertDeleteOperatorDescriptor rtreeUpdate = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
+ indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index b732191..7c25be8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -100,7 +100,7 @@
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
IFeedAdapter adapter = null;
- FeedRuntimeId feedRuntimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+ FeedRuntimeId feedRuntimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.feedManager = runtimeCtx.getFeedManager();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 47b00dd..6380269 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -57,7 +57,7 @@
public void initialize() throws HyracksDataException {
try {
writer.open();
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
FeedRuntime feedRuntime = feedManager.getFeedRuntime(runtimeId);
boolean ingestionLocation = feedRuntime != null;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 1a8a460..d895d5c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -62,8 +62,11 @@
*/
private final FeedRuntimeType runtimeType;
+ private final String operationId;
+
public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
- IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType) {
+ IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType,
+ String operationId) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicy = feedPolicy;
@@ -72,13 +75,14 @@
}
this.coreOperator = coreOperatorDescriptor;
this.runtimeType = runtimeType;
+ this.operationId = operationId;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
- feedConnectionId, feedPolicy, runtimeType);
+ feedConnectionId, feedPolicy, runtimeType, operationId);
}
@Override
@@ -131,9 +135,11 @@
/** The (singleton) instance of IFeedManager **/
private IFeedManager feedManager;
+ private final String operandId;
+
public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- FeedPolicy feedPolicy, FeedRuntimeType runtimeType) throws HyracksDataException {
+ FeedPolicy feedPolicy, FeedRuntimeType runtimeType, String operationId) throws HyracksDataException {
this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
.createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
@@ -145,15 +151,16 @@
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.feedManager = runtimeCtx.getFeedManager();
+ this.operandId = operationId;
}
@Override
public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, feedId, partition);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, runtimeType, operandId, partition);
try {
feedRuntime = feedManager.getFeedRuntime(runtimeId);
if (feedRuntime == null) {
- feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
+ feedRuntime = new FeedRuntime(feedId, partition, runtimeType, operandId);
feedManager.registerFeedRuntime(feedRuntime);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
@@ -183,12 +190,14 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
if (resumeOldState) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("State from previous zombie instance "
- + feedRuntime.getRuntimeState().getFrame());
+ FeedRuntimeState runtimeState = feedRuntime.getRuntimeState();
+ if (runtimeState != null) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("State from previous zombie instance " + feedRuntime.getRuntimeState());
+ }
+ coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
+ feedRuntime.setRuntimeState(null);
}
- coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
- feedRuntime.setRuntimeState(null);
resumeOldState = false;
}
currentBuffer = buffer;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 6e0ed83..b90038d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -26,8 +26,10 @@
import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -91,8 +93,10 @@
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
// copy operators
+ String operationId = null;
Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
+ operationId = FeedRuntime.FeedRuntimeId.DEFAULT_OPERATION_ID;
IOperatorDescriptor opDesc = entry.getValue();
if (opDesc instanceof FeedIntakeOperatorDescriptor) {
FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
@@ -107,8 +111,14 @@
}
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
+ operationId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, FeedRuntimeType.STORAGE);
+ feedPolicy, FeedRuntimeType.STORAGE, operationId);
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ } else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
+ operationId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+ FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
+ feedPolicy, FeedRuntimeType.STORAGE, operationId);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
} else {
FeedRuntimeType runtimeType = null;
@@ -122,7 +132,7 @@
}
}
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, runtimeType);
+ feedPolicy, runtimeType, operationId);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
}