Merge "Merge commit '7311b03' from stabilization-f69489"
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index aff00fc..65ecd8d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -128,4 +128,10 @@
builder.getChars(0, builder.length(), value, 0);
this.size = builder.length();
}
+
+ public void set(String strValue) throws IOException {
+ ensureCapacity(strValue.length());
+ strValue.getChars(0, strValue.length(), value, 0);
+ this.size = strValue.length();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
index b466614..81ad5ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -57,7 +57,7 @@
public HttpServerRecordReader(int port, String entryPoint, int queueSize, HttpServerConfig httpServerConfig)
throws Exception {
this.inputQ = new LinkedBlockingQueue<>(queueSize > 0 ? queueSize : DEFAULT_QUEUE_SIZE);
- this.record = new CharArrayRecord(0);
+ this.record = new CharArrayRecord();
webManager = new WebManager();
webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), port, httpServerConfig);
webServer.addServlet(new HttpFeedServlet(webServer.ctx(),
@@ -77,7 +77,7 @@
if (srecord == null) {
return null;
}
- record.set(srecord.toCharArray());
+ record.set(srecord);
return record;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index cc4b7f9..883f0ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -39,9 +39,9 @@
private char recordStart;
private char recordEnd;
private int recordNumber = 0;
- private static final List<String> recordReaderFormats =
- Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_ADM,
- ExternalDataConstants.FORMAT_JSON, ExternalDataConstants.FORMAT_SEMISTRUCTURED));
+ private static final List<String> recordReaderFormats = Collections.unmodifiableList(
+ Arrays.asList(ExternalDataConstants.FORMAT_ADM, ExternalDataConstants.FORMAT_JSON_LOWER_CASE,
+ ExternalDataConstants.FORMAT_JSON_UPPER_CASE, ExternalDataConstants.FORMAT_SEMISTRUCTURED));
private static final String REQUIRED_CONFIGS = "";
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 2cf5396..90662b2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -24,7 +24,7 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,7 +35,7 @@
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
-public class TwitterPullRecordReader implements IRecordReader<String> {
+public class TwitterPullRecordReader implements IRecordReader<char[]> {
private Query query;
private Twitter twitter;
@@ -43,7 +43,7 @@
private QueryResult result;
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
- private GenericRecord<String> record;
+ private CharArrayRecord record;
private boolean stopped = false;
public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
@@ -51,7 +51,7 @@
this.requestInterval = requestInterval;
this.query = new Query(keywords);
this.query.setCount(100);
- this.record = new GenericRecord<>();
+ this.record = new CharArrayRecord();
}
@Override
@@ -65,7 +65,7 @@
}
@Override
- public IRawRecord<String> next() throws IOException, InterruptedException {
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
if (result == null || nextTweetIndex >= result.getTweets().size()) {
Thread.sleep(1000 * requestInterval);
query.setSinceId(lastTweetIdReceived);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 3c63281..0f93914 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -24,17 +24,17 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.TwitterUtil;
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
-public class TwitterPushRecordReader implements IRecordReader<String> {
+public class TwitterPushRecordReader implements IRecordReader<char[]> {
private LinkedBlockingQueue<String> inputQ;
private TwitterStream twitterStream;
- private GenericRecord<String> record;
+ private CharArrayRecord record;
private boolean closed = false;
public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener,
@@ -60,7 +60,7 @@
}
private void init(TwitterStream twitterStream) {
- record = new GenericRecord<>();
+ record = new CharArrayRecord();
inputQ = new LinkedBlockingQueue<>();
this.twitterStream = twitterStream;
}
@@ -81,7 +81,7 @@
}
@Override
- public IRawRecord<String> next() throws IOException, InterruptedException {
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
String tweet = inputQ.poll();
if (tweet == null) {
return null;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index a31e0da..43a3816 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -43,7 +43,7 @@
import twitter4j.FilterQuery;
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> {
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<char[]> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -128,9 +128,9 @@
}
@Override
- public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- IRecordReader<? extends String> recordReader;
+ IRecordReader<? extends char[]> recordReader;
switch (configuration.get(ExternalDataConstants.KEY_READER)) {
case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -161,8 +161,8 @@
}
@Override
- public Class<? extends String> getRecordClass() {
- return String.class;
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
}
private boolean validateConfiguration(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 0183196..6cf84df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -47,7 +47,13 @@
import java.io.IOException;
import java.util.Iterator;
-public class TweetParser extends AbstractDataParser implements IRecordDataParser<String> {
+/**
+ * This class was introduced to parse Twitter data. As Tweets are JSON formatted records, we can use the JSON parser to
+ * to parse them instead of having this dedicated parser. In the future, we could either deprecate this class, or add
+ * some Tweet specific parsing processes into this class.
+ */
+
+public class TweetParser extends AbstractDataParser implements IRecordDataParser<char[]> {
private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool =
new ListObjectPool<>(new RecordBuilderFactory());
private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool =
@@ -252,12 +258,12 @@
}
@Override
- public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException {
+ public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
try {
//TODO get rid of this temporary json
resetPools();
ObjectMapper om = new ObjectMapper();
- writeRecord(om.readTree(record.get()), out, recordType);
+ writeRecord(om.readTree(record.getBytes()), out, recordType);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
index 501aea0..f9d50d3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -41,7 +42,8 @@
public class JSONDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
private static final long serialVersionUID = 1L;
- private static final List<String> PARSER_FORMAT = Collections.unmodifiableList(Arrays.asList("json"));
+ private static final List<String> PARSER_FORMAT = Collections.unmodifiableList(
+ Arrays.asList(ExternalDataConstants.FORMAT_JSON_LOWER_CASE, ExternalDataConstants.FORMAT_JSON_UPPER_CASE));
private static final List<ATypeTag> UNSUPPORTED_TYPES = Collections
.unmodifiableList(Arrays.asList(ATypeTag.MULTISET, ATypeTag.POINT3D, ATypeTag.CIRCLE, ATypeTag.RECTANGLE,
ATypeTag.INTERVAL, ATypeTag.DAYTIMEDURATION, ATypeTag.DURATION, ATypeTag.BINARY));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index 34f0434..7844924 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -26,13 +26,15 @@
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.parser.TweetParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-public class TweetParserFactory implements IRecordDataParserFactory<String> {
+public class TweetParserFactory implements IRecordDataParserFactory<char[]> {
private static final long serialVersionUID = 1L;
- private static final List<String> parserFormats = Collections.unmodifiableList(Arrays.asList("twitter-status"));
+ private static final List<String> parserFormats =
+ Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_TWEET));
private ARecordType recordType;
@Override
@@ -46,7 +48,7 @@
}
@Override
- public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) {
+ public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
TweetParser dataParser = new TweetParser(recordType);
return dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/RecordConverterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/RecordConverterFactoryProvider.java
index 77e634f..ea879fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/RecordConverterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/RecordConverterFactoryProvider.java
@@ -31,7 +31,7 @@
throws AsterixException {
switch (recordFormat) {
case ExternalDataConstants.FORMAT_ADM:
- case ExternalDataConstants.FORMAT_JSON:
+ case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
// converter that produces records of adm/json type
switch (format) {
case ExternalDataConstants.FORMAT_CSV:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 5a04e33..729215e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -153,7 +153,8 @@
public static final String FORMAT_HIVE = "hive";
public static final String FORMAT_BINARY = "binary";
public static final String FORMAT_ADM = "adm";
- public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_JSON_LOWER_CASE = "json";
+ public static final String FORMAT_JSON_UPPER_CASE = "JSON";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_TWEET = "twitter-status";
public static final String FORMAT_RSS = "rss";
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
index 3a9d7a5..6e2c92f 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
@@ -35,7 +35,7 @@
public void Test() throws AsterixException {
List<String> recordReaderFormats =
Arrays.asList(ExternalDataConstants.FORMAT_LINE_SEPARATED, ExternalDataConstants.FORMAT_ADM,
- ExternalDataConstants.FORMAT_JSON, ExternalDataConstants.FORMAT_SEMISTRUCTURED,
+ ExternalDataConstants.FORMAT_JSON_LOWER_CASE, ExternalDataConstants.FORMAT_SEMISTRUCTURED,
ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV);
Map<String, String> config = new HashMap<>();
for (String format : recordReaderFormats) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
index 9916fa5..e18c13d 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.parser.TweetParser;
import org.apache.asterix.om.types.AOrderedListType;
@@ -64,12 +65,12 @@
new ARecordType("TweetType", new String[] { "id", "geo" }, new IAType[] { AINT64, geoUnionType }, true);
TweetParser parser = new TweetParser(tweetRecordType);
+ CharArrayRecord record = new CharArrayRecord();
List<String> lines = Files.readAllLines(Paths.get(getClass().getResource("/test_tweets.txt").toURI()));
ByteArrayOutputStream is = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(is);
for (int iter1 = 0; iter1 < lines.size(); iter1++) {
- GenericRecord<String> record = new GenericRecord<>();
record.set(lines.get(iter1));
try {
parser.parse(record, output);
@@ -93,9 +94,9 @@
List<String> lines = Files.readAllLines(Paths.get(getClass().getResource("/test_tweets.txt").toURI()));
ByteArrayOutputStream is = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(is);
+ CharArrayRecord record = new CharArrayRecord();
int regularCount = 0;
for (int iter1 = 0; iter1 < lines.size(); iter1++) {
- GenericRecord<String> record = new GenericRecord<>();
record.set(lines.get(iter1));
try {
parser.parse(record, output);