[ASTERIXDB-2264][ING] Introduce Http Feed Adapter

- user-model changes: add http_adapter for feed.
- storage format changes: no
- interface changes: no

Details:
1. Added http feed. User may use following syntax to create a http feed:
    create feed TweetFeed with {
        "adapter-name" : "http_adapter",
        "addresses" : "asterix_nc2:10002,asterix_nc1:10001",
        "address-type" : "NC",
        "type-name" : "TweetMessageType",
        "format" : "adm"
    };
2. Refactored insert-feed test case to avoid waiting for 10s.
3. Refactored some constants in feeds.

Change-Id: I3c197a3df557ecc01f07f0907688c4ea81379e40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2994
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
new file mode 100644
index 0000000..b6e5aa3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TweetMessageType as open {
+    id : string
+};
+
+create dataset Tweets(TweetMessageType) primary key id;
+
+create feed TweetFeed with {
+  "adapter-name" : "http_adapter",
+  "addresses" : "asterix_nc2:10002,asterix_nc1:10001",
+  "address-type" : "NC",
+  "type-name" : "TweetMessageType",
+  "format" : "adm"
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
new file mode 100644
index 0000000..75f92d5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+set `wait-for-completion-feed` "false";
+
+connect feed TweetFeed to dataset Tweets;
+start feed TweetFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
new file mode 100644
index 0000000..38d2959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10002 /
+--body={ "id": "nc2:1", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }{ "id": "nc2:2", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
new file mode 100644
index 0000000..b5632d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1:10001 /
+--body={ "id": "nc1:1", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp
new file mode 100644
index 0000000..2ea09b0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+1500
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp
new file mode 100644
index 0000000..7a07621
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp
new file mode 100644
index 0000000..bcddf7a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+select value count(t) from Tweets as t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm
new file mode 100644
index 0000000..e440e5c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm
@@ -0,0 +1 @@
+3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 52cb6e5..164fbf1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9036,6 +9036,11 @@
         <expected-error>Function fundv.test_func0@1 is being used. It cannot be dropped</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="http_feed">
+        <output-dir compare="Text">http_feed</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="big-object">
     <test-case FilePath="big-object">
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index aff7489..935b1c5 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -429,5 +429,13 @@
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-http</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 4c3b7e6..efa12ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -38,7 +38,7 @@
      * The data source type indicates whether the data source produces a continuous stream or
      * a set of records
      */
-    public enum DataSourceType {
+    enum DataSourceType {
         STREAM,
         RECORDS
     }
@@ -46,7 +46,7 @@
     /**
      * @return The data source type {STREAM or RECORDS}
      */
-    public DataSourceType getDataSourceType();
+    DataSourceType getDataSourceType();
 
     /**
      * Specifies on which locations this data source is expected to run.
@@ -54,7 +54,7 @@
      * @return
      * @throws AsterixException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
+    AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the
@@ -63,7 +63,7 @@
      * @param configuration
      * @throws AsterixException
      */
-    public void configure(IServiceContext ctx, Map<String, String> configuration)
+    void configure(IServiceContext ctx, Map<String, String> configuration)
             throws AlgebricksException, HyracksDataException;
 
     /**
@@ -71,7 +71,7 @@
      *
      * @return
      */
-    public default boolean isIndexible() {
+    default boolean isIndexible() {
         return false;
     }
 
@@ -84,7 +84,7 @@
      * @return
      * @throws AlgebricksException
      */
-    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx,
+    static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException {
         if (constraints == null) {
             IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
new file mode 100644
index 0000000..2979641
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.HttpServerConfig;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpServerRecordReader implements IRecordReader<char[]> {
+
+    public static final Logger LOGGER = LogManager.getLogger();
+
+    private static final String DEFAULT_ENTRY_POINT = "/";
+    private static final int DEFAULT_QUEUE_SIZE = 128;
+    private LinkedBlockingQueue<String> inputQ;
+    private GenericRecord<char[]> record;
+    private boolean closed = false;
+    private WebManager webManager;
+    private HttpServer webServer;
+
+    public HttpServerRecordReader(int port, String entryPoint, int queueSize, HttpServerConfig httpServerConfig)
+            throws Exception {
+        this.inputQ = new LinkedBlockingQueue<>(queueSize > 0 ? queueSize : DEFAULT_QUEUE_SIZE);
+        this.record = new GenericRecord<>();
+        webManager = new WebManager();
+        webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), port, httpServerConfig);
+        webServer.addServlet(new HttpFeedServlet(webServer.ctx(),
+                new String[] { entryPoint == null ? DEFAULT_ENTRY_POINT : entryPoint }, inputQ));
+        webManager.add(webServer);
+        webManager.start();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !closed;
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        String srecord = inputQ.poll();
+        if (srecord == null) {
+            return null;
+        }
+        record.set(srecord.toCharArray());
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            close();
+        } catch (Exception e) {
+            LOGGER.error(e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        // do nothing
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        // do nothing
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (!closed) {
+                webManager.stop();
+                closed = true;
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private class HttpFeedServlet extends AbstractServlet {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        private int splitIntoRecords(String admData) throws InterruptedException {
+            int p = 0;
+            int lvlCtr = 0;
+            int recordCtr = 0;
+            boolean inRecord = false;
+            char[] charBuff = admData.toCharArray();
+            for (int iter1 = 0; iter1 < charBuff.length; iter1++) {
+                if (charBuff[iter1] == '{') {
+                    if (!inRecord) {
+                        p = iter1;
+                        inRecord = true;
+                    }
+                    lvlCtr++;
+                } else if (charBuff[iter1] == '}') {
+                    lvlCtr--;
+                }
+                if (lvlCtr == 0) {
+                    if (inRecord) {
+                        inputQ.put(admData.substring(p, iter1 + 1) + '\n');
+                        recordCtr++;
+                        inRecord = false;
+                    }
+                    p = iter1;
+                }
+            }
+            return recordCtr;
+        }
+
+        public HttpFeedServlet(ConcurrentMap<String, Object> ctx, String[] paths, LinkedBlockingQueue<String> inputQ) {
+            super(ctx, paths);
+            this.inputQ = inputQ;
+        }
+
+        private int doPost(IServletRequest request) throws InterruptedException {
+            return splitIntoRecords(request.getHttpRequest().content().toString(StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public void handle(IServletRequest request, IServletResponse response) {
+            PrintWriter responseWriter = response.writer();
+            if (request.getHttpRequest().method() == HttpMethod.POST) {
+                try {
+                    responseWriter.write(String.valueOf(doPost(request)));
+                    response.setStatus(HttpResponseStatus.OK);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOGGER.log(Level.INFO, "exception thrown for {}", request, e);
+                    response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                    responseWriter.write(e.toString());
+                }
+            } else {
+                response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+            }
+            responseWriter.flush();
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
new file mode 100644
index 0000000..e258dce
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.server.HttpServerConfigBuilder;
+
+public class HttpServerRecordReaderFactory implements IRecordReaderFactory<char[]> {
+
+    private static final String KEY_CONFIGURATION_ADDRESSES = "addresses";
+    private static final String KEY_CONFIGURATION_PATH = "path";
+    private static final String KEY_CONFIGURATION_QUEUE_SIZE = "queue_size";
+
+    private static final List<String> recordReaderNames =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_HTTP);
+
+    private String entryPoint;
+    private String addrValue;
+    private int queueSize;
+    private Map<String, String> configurations;
+    private List<Pair<String, Integer>> serverAddrs;
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            return new HttpServerRecordReader(serverAddrs.get(partition).getRight(), entryPoint, queueSize,
+                    HttpServerConfigBuilder.createDefault());
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return FeedUtils.addressToAbsolutePartitionConstraints(serverAddrs);
+    }
+
+    private String getConfigurationValue(String key, boolean required) throws CompilationException {
+        String value = configurations.get(key);
+        if (value == null && required) {
+            throw new CompilationException("Required configuration missing: " + key);
+        }
+        return value;
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration) throws AlgebricksException {
+        this.configurations = configuration;
+        // necessary configs
+        addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
+        serverAddrs = FeedUtils.extractHostsPorts(getConfigurationValue(ExternalDataConstants.KEY_MODE, true), ctx,
+                addrValue);
+        // optional configs
+        String queueSizeStr = getConfigurationValue(KEY_CONFIGURATION_QUEUE_SIZE, false);
+        queueSize = queueSizeStr == null ? 0 : Integer.valueOf(queueSizeStr);
+        entryPoint = getConfigurationValue(KEY_CONFIGURATION_PATH, false);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index f8cf648..076842e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -46,9 +46,10 @@
     protected IInputStreamFactory streamFactory;
     protected Map<String, String> configuration;
     protected Class recordReaderClazz;
-    private static final List<String> recordReaderNames = Collections.unmodifiableList(
-            Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
-                    ExternalDataConstants.SOCKET, ExternalDataConstants.STREAM_SOCKET_CLIENT));
+    private static final List<String> recordReaderNames =
+            Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER,
+                    ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET, ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET,
+                    ExternalDataConstants.STREAM_SOCKET_CLIENT));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -69,8 +70,8 @@
         String reader = config.get(ExternalDataConstants.KEY_READER);
         if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
             streamFactory = new LocalFSInputStreamFactory();
-        } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
-                || reader.equals(ExternalDataConstants.SOCKET)) {
+        } else if (reader.equals(ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET)
+                || reader.equals(ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET)) {
             streamFactory = new SocketServerInputStreamFactory();
         } else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
             streamFactory = new SocketClientInputStreamFactory();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 8774762..a31e0da 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -55,10 +55,10 @@
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     private transient IServiceContext serviceCtx;
 
-    private static final List<String> recordReaderNames =
-            Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL,
-                    ExternalDataConstants.READER_TWITTER_PUSH, ExternalDataConstants.READER_PUSH_TWITTER,
-                    ExternalDataConstants.READER_PULL_TWITTER, ExternalDataConstants.READER_USER_STREAM_TWITTER));
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PULL, ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PUSH,
+            ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER, ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER,
+            ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -98,7 +98,8 @@
             throw new AsterixException(builder.toString());
         }
 
-        if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER)) {
+        if (configuration.get(ExternalDataConstants.KEY_READER)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER)) {
             if (configuration.get(SearchAPIConstants.QUERY) == null) {
                 throw new AsterixException(
                         "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
@@ -131,12 +132,12 @@
             throws HyracksDataException {
         IRecordReader<? extends String> recordReader;
         switch (configuration.get(ExternalDataConstants.KEY_READER)) {
-            case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
                 recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
                         configuration.get(SearchAPIConstants.QUERY),
                         Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
                 break;
-            case ExternalDataConstants.READER_PUSH_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER:
                 FilterQuery query;
                 try {
                     query = TwitterUtil.getFilterQuery(configuration);
@@ -149,7 +150,7 @@
                     throw HyracksDataException.create(e);
                 }
                 break;
-            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM:
                 recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
                         TwitterUtil.getUserTweetsListener());
                 break;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 1f1fa5c..ac3ac42 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -19,28 +19,20 @@
 package org.apache.asterix.external.input.stream.factory;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.SocketServerInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,66 +41,14 @@
 
     private static final long serialVersionUID = 1L;
     private List<Pair<String, Integer>> sockets;
-    private Mode mode = Mode.IP;
-
-    public static enum Mode {
-        NC,
-        IP
-    }
 
     @Override
-    public void configure(IServiceContext serviceCtx, Map<String, String> configuration)
-            throws AsterixException, CompilationException {
+    public void configure(IServiceContext serviceCtx, Map<String, String> configuration) throws CompilationException {
         try {
-            sockets = new ArrayList<>();
-            String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
-            if (modeValue != null) {
-                mode = Mode.valueOf(modeValue.trim().toUpperCase());
-            }
-            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
-            if (socketsValue == null) {
-                throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
-            }
-            Map<InetAddress, Set<String>> ncMap;
-            ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext) serviceCtx.getApplicationContext());
-            List<String> ncs =
-                    RuntimeUtils.getAllNodeControllers((ICcApplicationContext) serviceCtx.getApplicationContext());
-            String[] socketsArray = socketsValue.split(",");
-            Random random = new Random();
-            for (String socket : socketsArray) {
-                String[] socketTokens = socket.split(":");
-                String host = socketTokens[0].trim();
-                int port = Integer.parseInt(socketTokens[1].trim());
-                Pair<String, Integer> p = null;
-                switch (mode) {
-                    case IP:
-                        Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
-                        if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
-                            throw new CompilationException(
-                                    ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "host", host,
-                                    StringUtils.join(ncMap.keySet(), ", "));
-                        }
-                        String[] ncArray = ncsOnIp.toArray(new String[] {});
-                        String nc = ncArray[random.nextInt(ncArray.length)];
-                        p = new Pair<>(nc, port);
-                        break;
-
-                    case NC:
-                        p = new Pair<>(host, port);
-                        if (!ncs.contains(host)) {
-                            throw new CompilationException(
-                                    ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "NC", host,
-                                    StringUtils.join(ncs, ", "));
-
-                        }
-                        break;
-                }
-                sockets.add(p);
-            }
+            sockets = FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE), serviceCtx,
+                    configuration.get(ExternalDataConstants.KEY_SOCKETS));
         } catch (CompilationException e) {
             throw e;
-        } catch (HyracksDataException | UnknownHostException e) {
-            throw new AsterixException(e);
         } catch (Exception e) {
             throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
         }
@@ -121,7 +61,7 @@
             Pair<String, Integer> socket = sockets.get(partition);
             ServerSocket server;
             server = new ServerSocket();
-            server.bind(new InetSocketAddress(socket.second));
+            server.bind(new InetSocketAddress(socket.getRight()));
             return new SocketServerInputStream(server);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -130,11 +70,7 @@
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        List<String> locations = new ArrayList<>();
-        for (Pair<String, Integer> socket : sockets) {
-            locations.add(socket.first);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+        return FeedUtils.addressToAbsolutePartitionConstraints(sockets);
     }
 
     public List<Pair<String, Integer>> getSockets() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index c9dafbc..8024dc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -72,11 +72,11 @@
                     ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
         } else {
             switch (streamSource) {
-                case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+                case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
                     streamSourceFactory = new LocalFSInputStreamFactory();
                     break;
-                case ExternalDataConstants.SOCKET:
-                case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+                case ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET:
+                case ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET:
                     streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
                 case ExternalDataConstants.STREAM_SOCKET_CLIENT:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 494efb5..5a04e33 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -101,6 +101,20 @@
     public static final String KEY_HTTP_PROXY_PORT = "http-proxy-port";
     public static final String KEY_HTTP_PROXY_USER = "http-proxy-user";
     public static final String KEY_HTTP_PROXY_PASSWORD = "http-proxy-password";
+
+    /**
+     *  Keys for adapter name
+     **/
+    public static final String KEY_ADAPTER_NAME_TWITTER_PUSH = "twitter_push";
+    public static final String KEY_ADAPTER_NAME_PUSH_TWITTER = "push_twitter";
+    public static final String KEY_ADAPTER_NAME_TWITTER_PULL = "twitter_pull";
+    public static final String KEY_ADAPTER_NAME_PULL_TWITTER = "pull_twitter";
+    public static final String KEY_ADAPTER_NAME_TWITTER_USER_STREAM = "twitter_user_stream";
+    public static final String KEY_ADAPTER_NAME_LOCALFS = "localfs";
+    public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
+    public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
+    public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+
     /**
      * HDFS class names
      */
@@ -122,11 +136,6 @@
      * Builtin record readers
      */
     public static final String READER_HDFS = "hdfs";
-    public static final String READER_TWITTER_PUSH = "twitter_push";
-    public static final String READER_PUSH_TWITTER = "push_twitter";
-    public static final String READER_TWITTER_PULL = "twitter_pull";
-    public static final String READER_PULL_TWITTER = "pull_twitter";
-    public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
@@ -157,8 +166,6 @@
      * input streams
      */
     public static final String STREAM_HDFS = "hdfs";
-    public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
-    public static final String SOCKET = "socket";
     public static final String STREAM_SOCKET_CLIENT = "socket-client";
 
     /**
@@ -168,13 +175,7 @@
     public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
     public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
     public static final String ALIAS_HDFS_ADAPTER = "hdfs";
-    public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
     public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
-    public static final String ALIAS_RSS_ADAPTER = "rss";
-    public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
-    public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
-    public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
-    public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
     public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
     public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = "change_feed_with_meta";
     // for testing purposes
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index dc8a8aa..ecaced6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -19,18 +19,29 @@
 package org.apache.asterix.external.util;
 
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -44,6 +55,8 @@
 public class FeedUtils {
 
     public static final String FEED_EXTENSION_NAME = "Feed";
+    private static final String FEED_HOST_MODE_NC = "NC";
+    private static final String FEED_HOST_MODE_IP = "IP";
 
     public enum JobType {
         INTAKE,
@@ -134,4 +147,57 @@
         return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
 
     }
+
+    public static List<Pair<String, Integer>> extractHostsPorts(String modeValue, IServiceContext serviceCtx,
+            String hostsValue) throws AlgebricksException {
+        try {
+            List<Pair<String, Integer>> sockets = new ArrayList<>();
+            String mode = modeValue.trim().toUpperCase();
+            if (hostsValue == null) {
+                throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
+            }
+            Map<InetAddress, Set<String>> ncMap =
+                    RuntimeUtils.getNodeControllerMap((ICcApplicationContext) serviceCtx.getApplicationContext());
+            List<String> ncs =
+                    RuntimeUtils.getAllNodeControllers((ICcApplicationContext) serviceCtx.getApplicationContext());
+            String[] socketsArray = hostsValue.split(",");
+            Random random = new Random();
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                Pair<String, Integer> p = null;
+                if (FEED_HOST_MODE_IP.equals(mode)) {
+                    Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+                    if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+                        throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+                                FEED_HOST_MODE_IP, host, StringUtils.join(ncMap.keySet(), ", "));
+                    }
+                    String[] ncArray = ncsOnIp.toArray(new String[] {});
+                    String nc = ncArray[random.nextInt(ncArray.length)];
+                    p = Pair.of(nc, port);
+                } else if (FEED_HOST_MODE_NC.equals(mode)) {
+                    p = Pair.of(host, port);
+                    if (!ncs.contains(host)) {
+                        throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+                                FEED_HOST_MODE_NC, host, StringUtils.join(ncs, ", "));
+
+                    }
+                }
+                sockets.add(p);
+            }
+            return sockets;
+        } catch (HyracksDataException | UnknownHostException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static AlgebricksAbsolutePartitionConstraint addressToAbsolutePartitionConstraints(
+            List<Pair<String, Integer>> sockets) {
+        List<String> locations = new ArrayList<>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.getLeft());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 52e28be..0d96658 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -19,3 +19,4 @@
 org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
 org.apache.asterix.external.input.HDFSDataSourceFactory
 org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
+org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory