Feed Policy Refactoring

1. Fix the framesize inconsistency in feed dataflow.
2. Add a runtime test case for create feed with policy.
3. Fix the FeedRuntimeInputHandler flush() logic. Only flush after the
   writer is opened.
4. Refactor FeedPolicyAccessor and BuiltinPolices. Now we only have
   spill and discard polices.
5. Remove PolicyEnforcer. Merge the functionality into
   FeedPolicyAccessor.
6. Revise SocketServerInputStream. Make the expected exception more
   friendly.
7. Fixed one test case fail in change feed.
8. Refactor FeedRuntimeInputHandler consumer thread logic. Change the
   poison consumer thread mechanism.

Change-Id: Ibc10139925cfedee66d1263990ba80b94675f182
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1591
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 874cbb1..8b3b020 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -110,13 +111,15 @@
  */
 public class FeedOperations {
 
+    private static final CompilerProperties compilerProperties = AppContextInfo.INSTANCE.getCompilerProperties();
+
     private FeedOperations() {
     }
 
     private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
             MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
-        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        spec.setFrameSize(compilerProperties.getFrameSize());
         IAdapterFactory adapterFactory;
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
new file mode 100644
index 0000000..4e21323
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+}
+
+create type TweetMessageType as closed {
+    tweetid: string,
+    tweetid-copy:string,
+    user: TwitterUserType,
+    sender-location: point,
+    send-time: datetime,
+    send-time-copy:datetime,
+    referred-topics: {{ string }},
+    message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
new file mode 100644
index 0000000..40ebc90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets using policy Discard;
+
+start feed TweetFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
new file mode 100644
index 0000000..c78da31
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+start client 10001 file-client 127.0.0.1 ../asterix-app/data/twitter/tw_messages.adm 500 50 1000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
new file mode 100644
index 0000000..7c1ab60
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+5000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
new file mode 100644
index 0000000..167c507
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
new file mode 100644
index 0000000..b67042d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
new file mode 100644
index 0000000..cc02ce6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+stop 10001
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
new file mode 100644
index 0000000..406fbbb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.2.update.sqlpp
index 9f60585..d679188 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.2.update.sqlpp
@@ -21,6 +21,6 @@
 
 set `wait-for-completion-feed` `true`;
 
-connect  feed SyntheticTweetFeed to  dataset SyntheticTweets using policy `BasicFT`;
+connect  feed SyntheticTweetFeed to  dataset SyntheticTweets using policy `Basic`;
 
 start feed SyntheticTweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.2.update.sqlpp
index 0a940bd..b449744 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.2.update.sqlpp
@@ -19,5 +19,5 @@
 
 use feeds_09;
 set `wait-for-completion-feed` `true`;
-connect  feed SyntheticTweetFeed to  dataset SyntheticTweets using policy `BasicFT`;
+connect  feed SyntheticTweetFeed to  dataset SyntheticTweets using policy `Basic`;
 start feed SyntheticTweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_10/feeds_10.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_10/feeds_10.2.update.sqlpp
index 4c5cc66..011a986 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_10/feeds_10.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_10/feeds_10.2.update.sqlpp
@@ -19,5 +19,5 @@
 
 use feeds_10;
 set `wait-for-completion-feed` `true`;
-connect  feed TweetFeed to  dataset Tweets using policy `BasicFT`;
+connect  feed TweetFeed to  dataset Tweets using policy `Basic`;
 start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_11/feeds_11.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_11/feeds_11.2.update.sqlpp
index 7af2dbd..327a222 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_11/feeds_11.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_11/feeds_11.2.update.sqlpp
@@ -19,5 +19,5 @@
 
 use feeds;
 set `wait-for-completion-feed` `true`;
-connect  feed feeds.TweetFeed to  dataset feeds.Tweets using policy `BasicFT`;
+connect  feed feeds.TweetFeed to  dataset feeds.Tweets using policy `Basic`;
 start feed feeds.TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_12/feeds_12.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_12/feeds_12.2.update.sqlpp
index a3f0923..f5d9649 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_12/feeds_12.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_12/feeds_12.2.update.sqlpp
@@ -19,5 +19,5 @@
 
 use feeds_12;
 set `wait-for-completion-feed` `true`;
-connect  feed TweetFeed to  dataset Tweets using policy `BasicFT`;
+connect  feed TweetFeed to  dataset Tweets using policy `Basic`;
 start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/issue_230_feeds/issue_230_feeds.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/issue_230_feeds/issue_230_feeds.2.update.sqlpp
index 7af2dbd..327a222 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/issue_230_feeds/issue_230_feeds.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/issue_230_feeds/issue_230_feeds.2.update.sqlpp
@@ -19,5 +19,5 @@
 
 use feeds;
 set `wait-for-completion-feed` `true`;
-connect  feed feeds.TweetFeed to  dataset feeds.Tweets using policy `BasicFT`;
+connect  feed feeds.TweetFeed to  dataset feeds.Tweets using policy `Basic`;
 start feed feeds.TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_7/issue_251_dataset_hint_7.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_7/issue_251_dataset_hint_7.2.update.sqlpp
index c5f0625..4915658 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_7/issue_251_dataset_hint_7.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_7/issue_251_dataset_hint_7.2.update.sqlpp
@@ -29,6 +29,6 @@
 
 set `wait-for-completion-feed` `true`;
 
-connect  feed TweetFeed to  dataset Tweets using policy `BasicFT`;
+connect  feed TweetFeed to  dataset Tweets using policy `Basic`;
 
 start feed TweetFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
new file mode 100644
index 0000000..6466feb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
@@ -0,0 +1,10 @@
+{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Eckhardstein", "followers_count": 3311368 }, "sender-location": point("42.13,80.43"), "send-time": datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" }}, "message-text": " love samsung the plan is amazing" }
+{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": "Rolldstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Eckhardstful", "followers_count": 3311368 }, "sender-location": point("46.94,93.98"), "send-time": datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", "signal" }}, "message-text": " like t-mobile the signal is good" }
+{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "David Eckhardstein", "followers_count": 3311368 }, "sender-location": point("28.86,70.44"), "send-time": datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", "voice-clarity" }}, "message-text": " like sprint its voice-clarity is mind-blowing" }
+{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": "RollandEckhard#500", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Hetfield", "followers_count": 3311368 }, "sender-location": point("39.84,86.48"), "send-time": datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", "voice-command" }}, "message-text": " can't stand verizon its voice-command is terrible:(" }
+{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": "RollandEckhardstein#221", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Eckhardstinz", "followers_count": 3311368 }, "sender-location": point("27.67,87.32"), "send-time": datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", "customer-service" }}, "message-text": " love t-mobile its customer-service is mind-blowing" }
+{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": "RollandEcstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Eckhardst", "followers_count": 3311368 }, "sender-location": point("27.3,92.77"), "send-time": datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", "customization" }}, "message-text": " like t-mobile the customization is amazing:)" }
+{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": "Rollkhardstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Kirk Hammette ", "followers_count": 3311368 }, "sender-location": point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), "send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ "iphone", "network" }}, "message-text": " like iphone its network is awesome:)" }
+{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": "andEckhardstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland khardstein", "followers_count": 3311368 }, "sender-location": point("44.12,81.46"), "send-time": datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", "network" }}, "message-text": " hate t-mobile the network is bad" }
+{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": "Rolltein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Ron Eckhardstein", "followers_count": 3311368 }, "sender-location": point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), "send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ "at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is OMG" }
+{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": "Roldstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, "name": "Rolland Eckdstein", "followers_count": 3311368 }, "sender-location": point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), "send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ "verizon", "speed" }}, "message-text": " hate verizon its speed is bad" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 573793f..cbde66d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", "ReturnType": "TweetType: closed {\n  id: string,\n  username: string,\n  location: string,\n  text: string,\n  timestamp: string\n}\n", "AppliedFunctions": {{ "feed_processor" }}, "PolicyName": "BasicFT" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", "ReturnType": "TweetType: closed {\n  id: string,\n  username: string,\n  location: string,\n  text: string,\n  timestamp: string\n}\n", "AppliedFunctions": {{ "feed_processor" }}, "PolicyName": "Basic" }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 9982477..329451d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -59,7 +59,7 @@
     private final int initialFrameSize;
     private final FrameTransporter consumer;
     private final Thread consumerThread;
-    private final LinkedBlockingDeque<ByteBuffer> inbox;
+    private final LinkedBlockingQueue<ByteBuffer> inbox;
     private final ConcurrentFramePool framePool;
     private Mode mode = Mode.PROCESS;
     private int total = 0;
@@ -72,7 +72,6 @@
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
-
         this.spiller = fpa.spillToDiskOnCongestion()
                 ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
@@ -82,10 +81,9 @@
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
-        this.inbox = new LinkedBlockingDeque<>();
+        this.inbox = new LinkedBlockingQueue<>();
         this.consumer = new FrameTransporter();
-        this.consumerThread = new Thread(consumer);
-        this.consumerThread.start();
+        this.consumerThread = new Thread(consumer, "FeedRuntimeInputHandler-FrameTransporter");
         this.initialFrameSize = ctx.getInitialFrameSize();
         this.frameAction = new FrameAction();
     }
@@ -94,6 +92,7 @@
     public void open() throws HyracksDataException {
         synchronized (writer) {
             writer.open();
+            consumerThread.start();
         }
     }
 
@@ -106,14 +105,18 @@
 
     @Override
     public void close() throws HyracksDataException {
-        consumer.poison();
-        synchronized (mutex) {
-            if (DEBUG) {
-                LOGGER.info("Producer is waking up consumer");
-            }
-            mutex.notify();
-        }
         try {
+            // Here we only put the poison frame into the inbox.
+            // If we use nextframe, chances are this frame will also be
+            // flushed into the spilled file. This causes problem when trying to
+            // read the frame and the size info is lost.
+            inbox.put(ByteBuffer.allocate(0));
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
+            }
             consumerThread.join();
         } catch (InterruptedException e) {
             LOGGER.log(Level.WARNING, e.getMessage(), e);
@@ -391,9 +394,7 @@
 
     @Override
     public void flush() throws HyracksDataException {
-        synchronized (writer) {
-            writer.flush();
-        }
+        // no op
     }
 
     public int getNumDiscarded() {
@@ -415,16 +416,11 @@
     private class FrameTransporter implements Runnable {
         private volatile Throwable cause;
         private int consumed = 0;
-        private boolean poisoned = false;
 
         public Throwable cause() {
             return cause;
         }
 
-        public void poison() {
-            poisoned = true;
-        }
-
         private Throwable consume(ByteBuffer frame) {
             while (frame != null) {
                 try {
@@ -446,61 +442,59 @@
             return null;
         }
 
+        private boolean clearLocalFrames() throws HyracksDataException {
+            ByteBuffer frame = spiller.next();
+            while (frame != null) {
+                if (consume(frame) != null) {
+                    return false;
+                }
+                frame = spiller.next();
+            }
+            return true;
+        }
+
         @Override
         public void run() {
             try {
-                ByteBuffer frame = inbox.poll();
-                while (true) {
-                    if (frame != null) {
+                ByteBuffer frame;
+                boolean running = true;
+                while (running) {
+                    frame = inbox.poll();
+
+                    if (frame == null && spiller != null) {
+                        running = clearLocalFrames();
+                        continue;
+                    }
+
+                    if (frame == null) {
+                        synchronized (mutex) {
+                            LOGGER.info("Consumer is going to sleep");
+                            mutex.wait();
+                            LOGGER.info("Consumer is waking up");
+                        }
+                        continue;
+                    }
+
+                    // process
+                    if (frame.capacity() == 0) {
+                        running = false;
+                        if (spiller != null ) {
+                            clearLocalFrames();
+                        }
+                    } else {
                         try {
                             if (consume(frame) != null) {
                                 return;
                             }
                         } finally {
-                            // Done with frame.
                             framePool.release(frame);
                         }
                     }
-                    frame = inbox.poll();
-                    if (frame == null) {
-                        // Memory queue is empty. Check spill
-                        if (spiller != null) {
-                            frame = spiller.next();
-                            while (frame != null) {
-                                if (consume(frame) != null) {
-                                    // We don't release the frame since this is a spill frame that we didn't get from memory
-                                    // manager
-                                    return;
-                                }
-                                frame = spiller.next();
-                            }
-                        }
-                        writer.flush();
-                        // At this point. We consumed all memory and spilled
-                        // We can't assume the next will be in memory. what if there is 0 memory?
-                        synchronized (mutex) {
-                            frame = inbox.poll();
-                            // Nothing in memory
-                            if (frame == null && (spiller == null || spiller.switchToMemory())) {
-                                if (poisoned) {
-                                    break;
-                                }
-                                if (DEBUG) {
-                                    LOGGER.info("Consumer is going to sleep");
-                                }
-                                // Nothing in disk
-                                mutex.wait();
-                                if (DEBUG) {
-                                    LOGGER.info("Consumer is waking up");
-                                }
-                            }
-                        }
-                    }
+                    writer.flush();
                 }
             } catch (Throwable th) {
                 this.cause = th;
             }
-            // cleanup will always be done through the close() call
         }
 
         @Override
@@ -513,7 +507,7 @@
         return total;
     }
 
-    public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+    public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() {
         return inbox;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
index cb722bb..becf41d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
@@ -30,33 +30,12 @@
 
     /**
      * --------------------------
-     * failure configuration
-     * --------------------------
-     **/
-
-    /** continue feed ingestion after a soft (runtime) failure **/
-    public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
-
-    /** log failed tuple to an asterixdb dataset for future reference **/
-    public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
-
-    /** continue feed ingestion after loss of one or more machines (hardware failure) **/
-    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
-
-    /** auto-start a loser feed when the asterixdb instance is restarted **/
-    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
-
-    /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
-    public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
-
-    /**
-     * --------------------------
      * flow control configuration
      * --------------------------
      **/
 
     /** enable buffering in feeds **/
-    public static final String BUFFERING_ENABLED = "buffering.enabled";
+    public static final String FLOWCONTROL_ENABLED = "flowcontrol.enabled";
 
     /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
     public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
@@ -70,18 +49,9 @@
     /** maximum fraction of ingested data that can be discarded **/
     public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
 
-    /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
-    public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
-
-    /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
-    public static final String THROTTLING_ENABLED = "throttling.enabled";
-
     /** elasticity **/
     public static final String ELASTIC = "elastic";
 
-    /** statistics **/
-    public static final String TIME_TRACKING = "time.tracking";
-
     /** logging of statistics **/
     public static final String LOGGING_STATISTICS = "logging.statistics";
 
@@ -101,31 +71,9 @@
         this.feedPolicy = feedPolicy;
     }
 
-    /** Failure recover/reporting **/
-
-    public boolean logDataOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
-    }
-
-    public boolean continueOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
-    }
-
-    public boolean continueOnHardwareFailure() {
-        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
-    }
-
-    public boolean autoRestartOnClusterReboot() {
-        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
-    }
-
-    public boolean atleastOnceSemantics() {
-        return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
-    }
-
     /** flow control **/
-    public boolean bufferingEnabled() {
-        return getBooleanPropertyValue(BUFFERING_ENABLED, false);
+    public boolean flowControlEnabled() {
+        return getBooleanPropertyValue(FLOWCONTROL_ENABLED, false);
     }
 
     public boolean spillToDiskOnCongestion() {
@@ -136,10 +84,6 @@
         return getMaxFractionDiscard() > 0;
     }
 
-    public boolean throttlingEnabled() {
-        return getBooleanPropertyValue(THROTTLING_ENABLED, false);
-    }
-
     public long getMaxSpillOnDisk() {
         return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
     }
@@ -148,28 +92,9 @@
         return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
     }
 
-    public long getMaxDelayRecordPersistence() {
-        return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
-    }
-
-    /** Elasticity **/
-    public boolean isElastic() {
-        return getBooleanPropertyValue(ELASTIC, false);
-    }
-
-    /** Statistics **/
-    public boolean isTimeTrackingEnabled() {
-        return getBooleanPropertyValue(TIME_TRACKING, false);
-    }
-
-    /** Logging of statistics **/
-    public boolean isLoggingStatisticsEnabled() {
-        return getBooleanPropertyValue(LOGGING_STATISTICS, false);
-    }
-
     private boolean getBooleanPropertyValue(String key, boolean defValue) {
         String v = feedPolicy.get(key);
-        return v == null ? false : Boolean.valueOf(v);
+        return v == null ? defValue : Boolean.valueOf(v);
     }
 
     private long getLongPropertyValue(String key, long defValue) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
deleted file mode 100644
index 3483da4..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.policy;
-
-import java.util.Map;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedPolicyEnforcer {
-
-    private final FeedConnectionId connectionId;
-    private final FeedPolicyAccessor policyAccessor;
-
-    public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
-        this.connectionId = feedConnectionId;
-        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-    }
-
-    public boolean continueIngestionPostSoftwareFailure(HyracksDataException e) throws HyracksDataException {
-        return policyAccessor.continueOnSoftFailure();
-    }
-
-    public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return policyAccessor;
-    }
-
-    public FeedConnectionId getFeedId() {
-        return connectionId;
-    }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 964508f..ea55b5b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -71,7 +71,8 @@
             }
             read = connectionStream.read(b, off, len);
         } catch (IOException e) {
-            e.printStackTrace();
+            // exception is expected when no connection available
+            LOGGER.info("Exhausted all pending connections. Waiting for new ones to come.");
             read = -1;
         }
         while (read < 0) {
@@ -155,11 +156,10 @@
     @Override
     public boolean handleException(Throwable th) {
         try {
-            accept();
+            return accept();
         } catch (IOException e) {
             LOGGER.warn("Failed accepting more connections", e);
             return false;
         }
-        return true;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index bdc11f5..f289361 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -61,7 +61,7 @@
             ActiveRuntimeId runtimeId =
                     new ActiveRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT.toString(), partition);
             FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
-            if (policyAccessor.bufferingEnabled()) {
+            if (policyAccessor.flowControlEnabled()) {
                 writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
                         activeManager.getFramePool());
             } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index fbdbece..6732b15 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -55,10 +54,10 @@
     private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
 
     /**
-     * A policy enforcer that ensures dynamic decisions for a feed are taken
+     * A policy accessor that ensures dynamic decisions for a feed are taken
      * in accordance with the associated ingestion policy
      **/
-    private FeedPolicyEnforcer policyEnforcer;
+    private FeedPolicyAccessor policyAccessor;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -101,7 +100,7 @@
         this.ctx = ctx;
         this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
@@ -127,9 +126,9 @@
 
     private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exception {
         fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
-        FeedPolicyAccessor fpa = policyEnforcer.getFeedPolicyAccessor();
+        FeedPolicyAccessor fpa = policyAccessor;
         coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        if (fpa.bufferingEnabled()) {
+        if (fpa.flowControlEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator, fpa, fta,
                     feedManager.getFramePool());
         } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 0bb27db..f2193af 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -52,10 +52,10 @@
     private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
 
     /**
-     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * A policy accessor that ensures dyanmic decisions for a feed are taken
      * in accordance with the associated ingestion policy
      **/
-    private final FeedPolicyEnforcer policyEnforcer;
+    private final FeedPolicyAccessor policyAccessor;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -94,7 +94,7 @@
         this.ctx = ctx;
         this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
@@ -130,9 +130,9 @@
                 return;
             }
         }
-        if (policyEnforcer.getFeedPolicyAccessor().bufferingEnabled()) {
+        if (policyAccessor.flowControlEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator,
-                    policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFramePool());
+                    policyAccessor, fta, feedManager.getFramePool());
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index cc21360..9538711 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -69,8 +69,4 @@
     public static final class NamingConstants {
         public static final String LIBRARY_NAME_SEPARATOR = "#";
     }
-
-    public static class JobConstants {
-        public static final int DEFAULT_FRAME_SIZE = 8192;
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index d407b8a..2237505 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.external.feed.test;
 
+import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -33,6 +37,8 @@
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -100,7 +106,7 @@
     private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget,
             float discardFraction) {
         FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
-        Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+        Mockito.when(fpa.flowControlEnabled()).thenReturn(true);
         Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
         Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
         Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
@@ -108,6 +114,24 @@
         return fpa;
     }
 
+    private void cleanDiskFiles() throws IOException {
+        String filePrefix = "dataverse.feed(Feed)_dataset*";
+        Collection<File> files = FileUtils.listFiles(new File("."), new WildcardFileFilter(filePrefix), null);
+        for (File ifile : files) {
+            Files.deleteIfExists(ifile.toPath());
+        }
+    }
+
+    @org.junit.Before
+    public void testCleanBefore() throws IOException {
+        cleanDiskFiles();
+    }
+
+    @org.junit.After
+    public void testCleanAfter() throws IOException {
+        cleanDiskFiles();
+    }
+
     @org.junit.Test
     public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
         try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 57284b0..dfa00ab 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -27,26 +27,18 @@
 
 public class BuiltinFeedPolicies {
 
-    public static final FeedPolicyEntity BRITTLE = initializeBrittlePolicy();
-
     public static final FeedPolicyEntity BASIC = initializeBasicPolicy();
 
-    public static final FeedPolicyEntity BASIC_FT = initializeBasicFTPolicy();
-
-    public static final FeedPolicyEntity ADVANCED_FT = initializeAdvancedFTPolicy();
-
     public static final FeedPolicyEntity ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
 
     public static final FeedPolicyEntity ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
 
-    public static final FeedPolicyEntity ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
-
     public static final FeedPolicyEntity ELASTIC = initializeAdvancedFTElasticPolicy();
 
-    public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
-            ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, ELASTIC };
+    public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { BASIC, ADVANCED_FT_DISCARD,
+            ADVANCED_FT_SPILL, ELASTIC };
 
-    public static final FeedPolicyEntity DEFAULT_POLICY = BASIC_FT;
+    public static final FeedPolicyEntity DEFAULT_POLICY = BASIC;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
@@ -59,132 +51,48 @@
         return null;
     }
 
-    //Brittle
-    private static FeedPolicyEntity initializeBrittlePolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
-
-        String description = "Brittle";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
-    }
-
     //Basic
     private static FeedPolicyEntity initializeBasicPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
 
         String description = "Basic";
         return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
     }
 
-    // BasicFT
-    private static FeedPolicyEntity initializeBasicFTPolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "false");
-        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "1");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
-        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
-
-        String description = "Basic Monitored Fault-Tolerant";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
-    }
-
-    // AdvancedFT
-    private static FeedPolicyEntity initializeAdvancedFTPolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
-
-        String description = "Basic Monitored Fault-Tolerant with at least once semantics";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
-    }
-
-    // AdvancedFT_Discard
+    // Discard
     private static FeedPolicyEntity initializeAdvancedFTDiscardPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "false");
         policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
 
-        String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
+        String description = "FlowControl 100% Discard during congestion";
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Discard", description,
                 policyParams);
     }
 
-    // AdvancedFT_Spill
+    // Spill
     private static FeedPolicyEntity initializeAdvancedFTSpillPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.TRUE);
         policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "" + FeedPolicyAccessor.NO_LIMIT);
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
 
-        String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description,
-                policyParams);
-    }
-
-    // AdvancedFT_Spill
-    private static FeedPolicyEntity initializeAdvancedFTThrottlePolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.FALSE);
-        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "" + 0);
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
-
-        String description = "AdvancedFT Throttle during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
+        String description = "FlowControl 100% Spill during congestion";
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "Spill", description,
                 policyParams);
     }
 
     // AdvancedFT_Elastic
     private static FeedPolicyEntity initializeAdvancedFTElasticPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
-
         String description = "Basic Monitored Fault-Tolerant Elastic";
         return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
                 policyParams);