Merge branch 'master' into raman/master_feeds_711
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
new file mode 100644
index 0000000..d84c49b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
@@ -0,0 +1,41 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter. 
+                  Create a dataset that has an associated rtree index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed. 
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second). 
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711                 
+ * Expected Res : Success
+ * Date         : 8th Feb 2014
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUserType as closed {
+	screen-name: string,
+	lang: string,
+	friends_count: int32,
+	statuses_count: int32,
+	name: string,
+	followers_count: int32
+} 
+
+create type TweetMessageType as closed {
+	tweetid: int64,
+        user: TwitterUserType,
+        sender-location: point,
+	send-time: datetime,
+        referred-topics: {{ string }},
+	message-text: string
+}
+
+create dataset SyntheticTweets(TweetMessageType)
+primary key tweetid;
+
+create index locationIdx on SyntheticTweets(sender-location) type rtree;
+
+create feed  SyntheticTweetFeed
+using twitter_firehose
+(("duration"="5"),("tps"="50"),("tput-duration"="5"),("dataverse-dataset"="feeds:SyntheticTweets"),("mode"="controlled"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql
new file mode 100644
index 0000000..c52867a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.2.update.aql
@@ -0,0 +1,16 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter.                   
+                  Create a dataset that has an associated rtree index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 8th Feb 2014
+ */
+  
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+connect feed SyntheticTweetFeed to dataset SyntheticTweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql
new file mode 100644
index 0000000..9dcafda
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.3.query.aql
@@ -0,0 +1,22 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter.                   
+                  Create a dataset that has an associated rtree index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 8th Feb 2014
+ */
+
+use dataverse feeds;
+
+let $totalTweets:=count(
+for $x in dataset('SyntheticTweets')
+return $x)
+return 
+(if($totalTweets > 0)
+  then 1
+else
+  0
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql
new file mode 100644
index 0000000..42a16d7
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.1.ddl.aql
@@ -0,0 +1,30 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adaptor.
+                  The file_feed adaptor simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TweetType as closed {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+}      
+
+create dataset Tweets(TweetType)
+primary key id;
+
+create index usernameIdx on Tweets(username) type btree;
+
+create feed TweetFeed
+using file_feed
+(("fs"="hdfs"),("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/obamatweets.adm"),("format"="adm"),("input-format"="text-input-format"),("type-name"="TweetType"),("tuple-interval"="10"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql
new file mode 100644
index 0000000..cd9a45e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.2.update.aql
@@ -0,0 +1,15 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adaptor.
+                  The file_feed adaptor simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+  
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql
new file mode 100644
index 0000000..62dfb82
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/issue_711_feeds/issue_711_feeds.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adaptor.
+                  The file_feed adaptor simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+use dataverse feeds;
+
+for $x in dataset('Tweets')
+order by $x.id
+return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm
new file mode 100644
index 0000000..2567483
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/issue_711_feeds/issue_711_feeds.1.adm
@@ -0,0 +1,11 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+{ "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+{ "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
+{ "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+{ "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 067fddb..58c0d38 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4455,6 +4455,11 @@
         <output-dir compare="Text">issue_230_feeds</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="issue_711_feeds">
+        <output-dir compare="Text">issue_711_feeds</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="hdfs">
     <test-case FilePath="hdfs">
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 43afd27..57479f3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -35,16 +35,19 @@
 
     private static final long serialVersionUID = 1L;
 
+    private final String indexName;
+    
     public AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
             RecordDescriptor recDesc, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
             IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
             IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, String indexName) {
         super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
                 fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory);
+        this.indexName = indexName;
     }
 
     @Override
@@ -53,4 +56,8 @@
         return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
                 recordDescProvider, op, false);
     }
+
+    public String getIndexName() {
+        return indexName;
+    }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index 6f6a7b2..b18b90a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -34,18 +34,22 @@
 
     private static final long serialVersionUID = 1L;
 
+    /** the name of the index that is being operated upon **/
+    private final String indexName;
+
     private final boolean isPrimary;
 
-    public AsterixLSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
-            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
-            IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+    public AsterixLSMTreeInsertDeleteOperatorDescriptor(String indexName, IOperatorDescriptorRegistry spec,
+            RecordDescriptor recDesc, IStorageManagerInterface storageManager,
+            IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider,
+            ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+            int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
                 tupleFilterFactory, modificationOpCallbackProvider);
+        this.indexName = indexName;
         this.isPrimary = isPrimary;
     }
 
@@ -56,6 +60,10 @@
                 recordDescProvider, op, isPrimary);
     }
 
+    public String getIndexName() {
+        return indexName;
+    }
+
     public boolean isPrimary() {
         return isPrimary;
     }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
index d1b2faf..88e1db5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
@@ -34,12 +34,16 @@
     protected FeedRuntimeState runtimeState;
 
     public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType) {
-        this.feedRuntimeId = new FeedRuntimeId(feedRuntimeType, feedId, partition);
+        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, partition);
     }
 
-    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType,
+    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId) {
+        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
+    }
+
+    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId,
             FeedRuntimeState runtimeState) {
-        this.feedRuntimeId = new FeedRuntimeId(feedRuntimeType, feedId, partition);
+        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
         this.runtimeState = runtimeState;
     }
 
@@ -88,13 +92,24 @@
 
     public static class FeedRuntimeId {
 
+        public static final String DEFAULT_OPERATION_ID = "N/A";
         private final FeedRuntimeType feedRuntimeType;
+        private final String operandId;
         private final FeedConnectionId feedId;
         private final int partition;
         private final int hashCode;
 
-        public FeedRuntimeId(FeedRuntimeType runtimeType, FeedConnectionId feedId, int partition) {
+        public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, String operandId, int partition) {
             this.feedRuntimeType = runtimeType;
+            this.operandId = operandId;
+            this.feedId = feedId;
+            this.partition = partition;
+            this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
+        }
+
+        public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, int partition) {
+            this.feedRuntimeType = runtimeType;
+            this.operandId = DEFAULT_OPERATION_ID;
             this.feedId = feedId;
             this.partition = partition;
             this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
@@ -102,7 +117,7 @@
 
         @Override
         public String toString() {
-            return feedId + "[" + partition + "]" + " " + feedRuntimeType;
+            return feedId + "[" + partition + "]" + " " + feedRuntimeType + "(" + operandId + ")";
         }
 
         @Override
@@ -115,7 +130,7 @@
             if (o instanceof FeedRuntimeId) {
                 FeedRuntimeId oid = ((FeedRuntimeId) o);
                 return oid.getFeedId().equals(feedId) && oid.getFeedRuntimeType().equals(feedRuntimeType)
-                        && oid.getPartition() == partition;
+                        && oid.getOperandId().equals(operandId) && oid.getPartition() == partition;
             }
             return false;
         }
@@ -128,6 +143,10 @@
             return feedId;
         }
 
+        public String getOperandId() {
+            return operandId;
+        }
+
         public int getPartition() {
             return partition;
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1fb64be..63aa5db 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -862,7 +862,7 @@
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
             AsterixLSMTreeInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
-                    spec, recordDesc, appContext.getStorageManagerInterface(),
+                    indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
@@ -1057,7 +1057,7 @@
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
             AsterixLSMTreeInsertDeleteOperatorDescriptor btreeBulkLoad = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
-                    spec, recordDesc, appContext.getStorageManagerInterface(),
+                    indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
@@ -1204,7 +1204,7 @@
                     spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
                     appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
-                    indexDataFlowFactory, filterFactory, modificationCallbackFactory);
+                    indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
                     splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1287,7 +1287,7 @@
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
             AsterixLSMTreeInsertDeleteOperatorDescriptor rtreeUpdate = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
-                    spec, recordDesc, appContext.getStorageManagerInterface(),
+                    indexName, spec, recordDesc, appContext.getStorageManagerInterface(),
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index b732191..7c25be8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -100,7 +100,7 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
         IFeedAdapter adapter = null;
-        FeedRuntimeId feedRuntimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+        FeedRuntimeId feedRuntimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.feedManager = runtimeCtx.getFeedManager();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 47b00dd..6380269 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -57,7 +57,7 @@
     public void initialize() throws HyracksDataException {
         try {
             writer.open();
-            FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+            FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
             FeedRuntime feedRuntime = feedManager.getFeedRuntime(runtimeId);
             boolean ingestionLocation = feedRuntime != null;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 1a8a460..d895d5c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -62,8 +62,11 @@
      */
     private final FeedRuntimeType runtimeType;
 
+    private final String operationId;
+
     public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
-            IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType) {
+            IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType,
+            String operationId) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicy = feedPolicy;
@@ -72,13 +75,14 @@
         }
         this.coreOperator = coreOperatorDescriptor;
         this.runtimeType = runtimeType;
+        this.operationId = operationId;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
-                feedConnectionId, feedPolicy, runtimeType);
+                feedConnectionId, feedPolicy, runtimeType, operationId);
     }
 
     @Override
@@ -131,9 +135,11 @@
         /** The (singleton) instance of IFeedManager **/
         private IFeedManager feedManager;
 
+        private final String operandId;
+
         public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
                 int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-                FeedPolicy feedPolicy, FeedRuntimeType runtimeType) throws HyracksDataException {
+                FeedPolicy feedPolicy, FeedRuntimeType runtimeType, String operationId) throws HyracksDataException {
             this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                     .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
             this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
@@ -145,15 +151,16 @@
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
             this.feedManager = runtimeCtx.getFeedManager();
+            this.operandId = operationId;
         }
 
         @Override
         public void open() throws HyracksDataException {
-            FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, feedId, partition);
+            FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, runtimeType, operandId, partition);
             try {
                 feedRuntime = feedManager.getFeedRuntime(runtimeId);
                 if (feedRuntime == null) {
-                    feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
+                    feedRuntime = new FeedRuntime(feedId, partition, runtimeType, operandId);
                     feedManager.registerFeedRuntime(feedRuntime);
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
@@ -183,12 +190,14 @@
         public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
             try {
                 if (resumeOldState) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("State from previous zombie instance "
-                                + feedRuntime.getRuntimeState().getFrame());
+                    FeedRuntimeState runtimeState = feedRuntime.getRuntimeState();
+                    if (runtimeState != null) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("State from previous zombie instance " + feedRuntime.getRuntimeState());
+                        }
+                        coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
+                        feedRuntime.setRuntimeState(null);
                     }
-                    coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
-                    feedRuntime.setRuntimeState(null);
                     resumeOldState = false;
                 }
                 currentBuffer = buffer;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 6e0ed83..b90038d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -26,8 +26,10 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
 import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -91,8 +93,10 @@
         Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
 
         // copy operators
+        String operationId = null;
         Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
         for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
+            operationId = FeedRuntime.FeedRuntimeId.DEFAULT_OPERATION_ID;
             IOperatorDescriptor opDesc = entry.getValue();
             if (opDesc instanceof FeedIntakeOperatorDescriptor) {
                 FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
@@ -107,8 +111,14 @@
                 }
                 oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
             } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
+                operationId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
                 FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
-                        feedPolicy, FeedRuntimeType.STORAGE);
+                        feedPolicy, FeedRuntimeType.STORAGE, operationId);
+                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+            } else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
+                operationId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+                FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
+                        feedPolicy, FeedRuntimeType.STORAGE, operationId);
                 oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
             } else {
                 FeedRuntimeType runtimeType = null;
@@ -122,7 +132,7 @@
                     }
                 }
                 FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
-                        feedPolicy, runtimeType);
+                        feedPolicy, runtimeType, operationId);
 
                 oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
             }