[ASTERIXDB-2264][ING] Introduce Http Feed Adapter
- user-model changes: add http_adapter for feed.
- storage format changes: no
- interface changes: no
Details:
1. Added http feed. User may use following syntax to create a http feed:
create feed TweetFeed with {
"adapter-name" : "http_adapter",
"addresses" : "asterix_nc2:10002,asterix_nc1:10001",
"address-type" : "NC",
"type-name" : "TweetMessageType",
"format" : "adm"
};
2. Refactored insert-feed test case to avoid waiting for 10s.
3. Refactored some constants in feeds.
Change-Id: I3c197a3df557ecc01f07f0907688c4ea81379e40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2994
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
new file mode 100644
index 0000000..b6e5aa3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TweetMessageType as open {
+ id : string
+};
+
+create dataset Tweets(TweetMessageType) primary key id;
+
+create feed TweetFeed with {
+ "adapter-name" : "http_adapter",
+ "addresses" : "asterix_nc2:10002,asterix_nc1:10001",
+ "address-type" : "NC",
+ "type-name" : "TweetMessageType",
+ "format" : "adm"
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
new file mode 100644
index 0000000..75f92d5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+set `wait-for-completion-feed` "false";
+
+connect feed TweetFeed to dataset Tweets;
+start feed TweetFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
new file mode 100644
index 0000000..38d2959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10002 /
+--body={ "id": "nc2:1", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }{ "id": "nc2:2", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
new file mode 100644
index 0000000..b5632d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1:10001 /
+--body={ "id": "nc1:1", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp
new file mode 100644
index 0000000..2ea09b0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.sleep.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+1500
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp
new file mode 100644
index 0000000..7a07621
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp
new file mode 100644
index 0000000..bcddf7a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+select value count(t) from Tweets as t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm
new file mode 100644
index 0000000..e440e5c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.3.adm
@@ -0,0 +1 @@
+3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 52cb6e5..164fbf1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9036,6 +9036,11 @@
<expected-error>Function fundv.test_func0@1 is being used. It cannot be dropped</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="http_feed">
+ <output-dir compare="Text">http_feed</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="big-object">
<test-case FilePath="big-object">
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index aff7489..935b1c5 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -429,5 +429,13 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 4c3b7e6..efa12ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -38,7 +38,7 @@
* The data source type indicates whether the data source produces a continuous stream or
* a set of records
*/
- public enum DataSourceType {
+ enum DataSourceType {
STREAM,
RECORDS
}
@@ -46,7 +46,7 @@
/**
* @return The data source type {STREAM or RECORDS}
*/
- public DataSourceType getDataSourceType();
+ DataSourceType getDataSourceType();
/**
* Specifies on which locations this data source is expected to run.
@@ -54,7 +54,7 @@
* @return
* @throws AsterixException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
+ AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
/**
* Configure the data parser factory. The passed map contains key value pairs from the
@@ -63,7 +63,7 @@
* @param configuration
* @throws AsterixException
*/
- public void configure(IServiceContext ctx, Map<String, String> configuration)
+ void configure(IServiceContext ctx, Map<String, String> configuration)
throws AlgebricksException, HyracksDataException;
/**
@@ -71,7 +71,7 @@
*
* @return
*/
- public default boolean isIndexible() {
+ default boolean isIndexible() {
return false;
}
@@ -84,7 +84,7 @@
* @return
* @throws AlgebricksException
*/
- public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx,
+ static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(ICcApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException {
if (constraints == null) {
IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
new file mode 100644
index 0000000..2979641
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.HttpServerConfig;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpServerRecordReader implements IRecordReader<char[]> {
+
+ public static final Logger LOGGER = LogManager.getLogger();
+
+ private static final String DEFAULT_ENTRY_POINT = "/";
+ private static final int DEFAULT_QUEUE_SIZE = 128;
+ private LinkedBlockingQueue<String> inputQ;
+ private GenericRecord<char[]> record;
+ private boolean closed = false;
+ private WebManager webManager;
+ private HttpServer webServer;
+
+ public HttpServerRecordReader(int port, String entryPoint, int queueSize, HttpServerConfig httpServerConfig)
+ throws Exception {
+ this.inputQ = new LinkedBlockingQueue<>(queueSize > 0 ? queueSize : DEFAULT_QUEUE_SIZE);
+ this.record = new GenericRecord<>();
+ webManager = new WebManager();
+ webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), port, httpServerConfig);
+ webServer.addServlet(new HttpFeedServlet(webServer.ctx(),
+ new String[] { entryPoint == null ? DEFAULT_ENTRY_POINT : entryPoint }, inputQ));
+ webManager.add(webServer);
+ webManager.start();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !closed;
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ String srecord = inputQ.poll();
+ if (srecord == null) {
+ return null;
+ }
+ record.set(srecord.toCharArray());
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ close();
+ } catch (Exception e) {
+ LOGGER.error(e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ // do nothing
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) {
+ // do nothing
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!closed) {
+ webManager.stop();
+ closed = true;
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private class HttpFeedServlet extends AbstractServlet {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ private int splitIntoRecords(String admData) throws InterruptedException {
+ int p = 0;
+ int lvlCtr = 0;
+ int recordCtr = 0;
+ boolean inRecord = false;
+ char[] charBuff = admData.toCharArray();
+ for (int iter1 = 0; iter1 < charBuff.length; iter1++) {
+ if (charBuff[iter1] == '{') {
+ if (!inRecord) {
+ p = iter1;
+ inRecord = true;
+ }
+ lvlCtr++;
+ } else if (charBuff[iter1] == '}') {
+ lvlCtr--;
+ }
+ if (lvlCtr == 0) {
+ if (inRecord) {
+ inputQ.put(admData.substring(p, iter1 + 1) + '\n');
+ recordCtr++;
+ inRecord = false;
+ }
+ p = iter1;
+ }
+ }
+ return recordCtr;
+ }
+
+ public HttpFeedServlet(ConcurrentMap<String, Object> ctx, String[] paths, LinkedBlockingQueue<String> inputQ) {
+ super(ctx, paths);
+ this.inputQ = inputQ;
+ }
+
+ private int doPost(IServletRequest request) throws InterruptedException {
+ return splitIntoRecords(request.getHttpRequest().content().toString(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ PrintWriter responseWriter = response.writer();
+ if (request.getHttpRequest().method() == HttpMethod.POST) {
+ try {
+ responseWriter.write(String.valueOf(doPost(request)));
+ response.setStatus(HttpResponseStatus.OK);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.INFO, "exception thrown for {}", request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ } else {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ }
+ responseWriter.flush();
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
new file mode 100644
index 0000000..e258dce
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.server.HttpServerConfigBuilder;
+
+public class HttpServerRecordReaderFactory implements IRecordReaderFactory<char[]> {
+
+ private static final String KEY_CONFIGURATION_ADDRESSES = "addresses";
+ private static final String KEY_CONFIGURATION_PATH = "path";
+ private static final String KEY_CONFIGURATION_QUEUE_SIZE = "queue_size";
+
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_HTTP);
+
+ private String entryPoint;
+ private String addrValue;
+ private int queueSize;
+ private Map<String, String> configurations;
+ private List<Pair<String, Integer>> serverAddrs;
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ return new HttpServerRecordReader(serverAddrs.get(partition).getRight(), entryPoint, queueSize,
+ HttpServerConfigBuilder.createDefault());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return FeedUtils.addressToAbsolutePartitionConstraints(serverAddrs);
+ }
+
+ private String getConfigurationValue(String key, boolean required) throws CompilationException {
+ String value = configurations.get(key);
+ if (value == null && required) {
+ throw new CompilationException("Required configuration missing: " + key);
+ }
+ return value;
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration) throws AlgebricksException {
+ this.configurations = configuration;
+ // necessary configs
+ addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
+ serverAddrs = FeedUtils.extractHostsPorts(getConfigurationValue(ExternalDataConstants.KEY_MODE, true), ctx,
+ addrValue);
+ // optional configs
+ String queueSizeStr = getConfigurationValue(KEY_CONFIGURATION_QUEUE_SIZE, false);
+ queueSize = queueSizeStr == null ? 0 : Integer.valueOf(queueSizeStr);
+ entryPoint = getConfigurationValue(KEY_CONFIGURATION_PATH, false);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index f8cf648..076842e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -46,9 +46,10 @@
protected IInputStreamFactory streamFactory;
protected Map<String, String> configuration;
protected Class recordReaderClazz;
- private static final List<String> recordReaderNames = Collections.unmodifiableList(
- Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
- ExternalDataConstants.SOCKET, ExternalDataConstants.STREAM_SOCKET_CLIENT));
+ private static final List<String> recordReaderNames =
+ Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER,
+ ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET, ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET,
+ ExternalDataConstants.STREAM_SOCKET_CLIENT));
@Override
public DataSourceType getDataSourceType() {
@@ -69,8 +70,8 @@
String reader = config.get(ExternalDataConstants.KEY_READER);
if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
streamFactory = new LocalFSInputStreamFactory();
- } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
- || reader.equals(ExternalDataConstants.SOCKET)) {
+ } else if (reader.equals(ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET)
+ || reader.equals(ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET)) {
streamFactory = new SocketServerInputStreamFactory();
} else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
streamFactory = new SocketClientInputStreamFactory();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 8774762..a31e0da 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -55,10 +55,10 @@
private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
private transient IServiceContext serviceCtx;
- private static final List<String> recordReaderNames =
- Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL,
- ExternalDataConstants.READER_TWITTER_PUSH, ExternalDataConstants.READER_PUSH_TWITTER,
- ExternalDataConstants.READER_PULL_TWITTER, ExternalDataConstants.READER_USER_STREAM_TWITTER));
+ private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList(
+ ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PULL, ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PUSH,
+ ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER, ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER,
+ ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM));
@Override
public DataSourceType getDataSourceType() {
@@ -98,7 +98,8 @@
throw new AsterixException(builder.toString());
}
- if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER)) {
+ if (configuration.get(ExternalDataConstants.KEY_READER)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER)) {
if (configuration.get(SearchAPIConstants.QUERY) == null) {
throw new AsterixException(
"parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
@@ -131,12 +132,12 @@
throws HyracksDataException {
IRecordReader<? extends String> recordReader;
switch (configuration.get(ExternalDataConstants.KEY_READER)) {
- case ExternalDataConstants.READER_PULL_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
configuration.get(SearchAPIConstants.QUERY),
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
break;
- case ExternalDataConstants.READER_PUSH_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER:
FilterQuery query;
try {
query = TwitterUtil.getFilterQuery(configuration);
@@ -149,7 +150,7 @@
throw HyracksDataException.create(e);
}
break;
- case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM:
recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
TwitterUtil.getUserTweetsListener());
break;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 1f1fa5c..ac3ac42 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -19,28 +19,20 @@
package org.apache.asterix.external.input.stream.factory;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,66 +41,14 @@
private static final long serialVersionUID = 1L;
private List<Pair<String, Integer>> sockets;
- private Mode mode = Mode.IP;
-
- public static enum Mode {
- NC,
- IP
- }
@Override
- public void configure(IServiceContext serviceCtx, Map<String, String> configuration)
- throws AsterixException, CompilationException {
+ public void configure(IServiceContext serviceCtx, Map<String, String> configuration) throws CompilationException {
try {
- sockets = new ArrayList<>();
- String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
- if (modeValue != null) {
- mode = Mode.valueOf(modeValue.trim().toUpperCase());
- }
- String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
- if (socketsValue == null) {
- throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
- }
- Map<InetAddress, Set<String>> ncMap;
- ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext) serviceCtx.getApplicationContext());
- List<String> ncs =
- RuntimeUtils.getAllNodeControllers((ICcApplicationContext) serviceCtx.getApplicationContext());
- String[] socketsArray = socketsValue.split(",");
- Random random = new Random();
- for (String socket : socketsArray) {
- String[] socketTokens = socket.split(":");
- String host = socketTokens[0].trim();
- int port = Integer.parseInt(socketTokens[1].trim());
- Pair<String, Integer> p = null;
- switch (mode) {
- case IP:
- Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
- if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
- throw new CompilationException(
- ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "host", host,
- StringUtils.join(ncMap.keySet(), ", "));
- }
- String[] ncArray = ncsOnIp.toArray(new String[] {});
- String nc = ncArray[random.nextInt(ncArray.length)];
- p = new Pair<>(nc, port);
- break;
-
- case NC:
- p = new Pair<>(host, port);
- if (!ncs.contains(host)) {
- throw new CompilationException(
- ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "NC", host,
- StringUtils.join(ncs, ", "));
-
- }
- break;
- }
- sockets.add(p);
- }
+ sockets = FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE), serviceCtx,
+ configuration.get(ExternalDataConstants.KEY_SOCKETS));
} catch (CompilationException e) {
throw e;
- } catch (HyracksDataException | UnknownHostException e) {
- throw new AsterixException(e);
} catch (Exception e) {
throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
}
@@ -121,7 +61,7 @@
Pair<String, Integer> socket = sockets.get(partition);
ServerSocket server;
server = new ServerSocket();
- server.bind(new InetSocketAddress(socket.second));
+ server.bind(new InetSocketAddress(socket.getRight()));
return new SocketServerInputStream(server);
} catch (IOException e) {
throw HyracksDataException.create(e);
@@ -130,11 +70,7 @@
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- List<String> locations = new ArrayList<>();
- for (Pair<String, Integer> socket : sockets) {
- locations.add(socket.first);
- }
- return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+ return FeedUtils.addressToAbsolutePartitionConstraints(sockets);
}
public List<Pair<String, Integer>> getSockets() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index c9dafbc..8024dc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -72,11 +72,11 @@
ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
} else {
switch (streamSource) {
- case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
streamSourceFactory = new LocalFSInputStreamFactory();
break;
- case ExternalDataConstants.SOCKET:
- case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET:
+ case ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET:
streamSourceFactory = new SocketServerInputStreamFactory();
break;
case ExternalDataConstants.STREAM_SOCKET_CLIENT:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 494efb5..5a04e33 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -101,6 +101,20 @@
public static final String KEY_HTTP_PROXY_PORT = "http-proxy-port";
public static final String KEY_HTTP_PROXY_USER = "http-proxy-user";
public static final String KEY_HTTP_PROXY_PASSWORD = "http-proxy-password";
+
+ /**
+ * Keys for adapter name
+ **/
+ public static final String KEY_ADAPTER_NAME_TWITTER_PUSH = "twitter_push";
+ public static final String KEY_ADAPTER_NAME_PUSH_TWITTER = "push_twitter";
+ public static final String KEY_ADAPTER_NAME_TWITTER_PULL = "twitter_pull";
+ public static final String KEY_ADAPTER_NAME_PULL_TWITTER = "pull_twitter";
+ public static final String KEY_ADAPTER_NAME_TWITTER_USER_STREAM = "twitter_user_stream";
+ public static final String KEY_ADAPTER_NAME_LOCALFS = "localfs";
+ public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
+ public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
+ public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+
/**
* HDFS class names
*/
@@ -122,11 +136,6 @@
* Builtin record readers
*/
public static final String READER_HDFS = "hdfs";
- public static final String READER_TWITTER_PUSH = "twitter_push";
- public static final String READER_PUSH_TWITTER = "push_twitter";
- public static final String READER_TWITTER_PULL = "twitter_pull";
- public static final String READER_PULL_TWITTER = "pull_twitter";
- public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
public static final String CLUSTER_LOCATIONS = "cluster-locations";
public static final String SCHEDULER = "hdfs-scheduler";
@@ -157,8 +166,6 @@
* input streams
*/
public static final String STREAM_HDFS = "hdfs";
- public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
- public static final String SOCKET = "socket";
public static final String STREAM_SOCKET_CLIENT = "socket-client";
/**
@@ -168,13 +175,7 @@
public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
public static final String ALIAS_HDFS_ADAPTER = "hdfs";
- public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
- public static final String ALIAS_RSS_ADAPTER = "rss";
- public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
- public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
- public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
- public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = "change_feed_with_meta";
// for testing purposes
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index dc8a8aa..ecaced6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -19,18 +19,29 @@
package org.apache.asterix.external.util;
import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -44,6 +55,8 @@
public class FeedUtils {
public static final String FEED_EXTENSION_NAME = "Feed";
+ private static final String FEED_HOST_MODE_NC = "NC";
+ private static final String FEED_HOST_MODE_IP = "IP";
public enum JobType {
INTAKE,
@@ -134,4 +147,57 @@
return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
}
+
+ public static List<Pair<String, Integer>> extractHostsPorts(String modeValue, IServiceContext serviceCtx,
+ String hostsValue) throws AlgebricksException {
+ try {
+ List<Pair<String, Integer>> sockets = new ArrayList<>();
+ String mode = modeValue.trim().toUpperCase();
+ if (hostsValue == null) {
+ throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
+ }
+ Map<InetAddress, Set<String>> ncMap =
+ RuntimeUtils.getNodeControllerMap((ICcApplicationContext) serviceCtx.getApplicationContext());
+ List<String> ncs =
+ RuntimeUtils.getAllNodeControllers((ICcApplicationContext) serviceCtx.getApplicationContext());
+ String[] socketsArray = hostsValue.split(",");
+ Random random = new Random();
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0].trim();
+ int port = Integer.parseInt(socketTokens[1].trim());
+ Pair<String, Integer> p = null;
+ if (FEED_HOST_MODE_IP.equals(mode)) {
+ Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+ if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+ throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+ FEED_HOST_MODE_IP, host, StringUtils.join(ncMap.keySet(), ", "));
+ }
+ String[] ncArray = ncsOnIp.toArray(new String[] {});
+ String nc = ncArray[random.nextInt(ncArray.length)];
+ p = Pair.of(nc, port);
+ } else if (FEED_HOST_MODE_NC.equals(mode)) {
+ p = Pair.of(host, port);
+ if (!ncs.contains(host)) {
+ throw new CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+ FEED_HOST_MODE_NC, host, StringUtils.join(ncs, ", "));
+
+ }
+ }
+ sockets.add(p);
+ }
+ return sockets;
+ } catch (HyracksDataException | UnknownHostException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static AlgebricksAbsolutePartitionConstraint addressToAbsolutePartitionConstraints(
+ List<Pair<String, Integer>> sockets) {
+ List<String> locations = new ArrayList<>();
+ for (Pair<String, Integer> socket : sockets) {
+ locations.add(socket.getLeft());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 52e28be..0d96658 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -19,3 +19,4 @@
org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
org.apache.asterix.external.input.HDFSDataSourceFactory
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
+org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory