ASTERIXDB-1360 Fix Error Message for Unknown Format
This change includes some refactoring for external
data. This refactoring makes the code less error prone
and separate data source selection from data parser
selection. It also fixes issue ASTERIXDB-1366 and adds
a test case for it as well.
Change-Id: Ib4aac833e30bd7c5a7706f5c8116383c2362c964
Reviewed-on: https://asterix-gerrit.ics.uci.edu/767
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index 5cd490a..b98618d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -71,26 +71,17 @@
*/
public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
-
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
IAdapterFactory adapterFactory = null;
IOperatorDescriptor feedIngestor;
AlgebricksPartitionConstraint ingesterPc;
-
- try {
- Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
- .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
- feedIngestor = t.first;
- ingesterPc = t.second;
- adapterFactory = t.third;
- } catch (AlgebricksException e) {
- e.printStackTrace();
- throw new AsterixException(e);
- }
-
+ Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
+ .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
+ feedIngestor = t.first;
+ ingesterPc = t.second;
+ adapterFactory = t.third;
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
-
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
@@ -252,7 +243,7 @@
private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
- throws AlgebricksException {
+ throws AlgebricksException {
IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
diff --git a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
index 5a7294c..4ffc5a7 100644
--- a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
@@ -36,6 +36,11 @@
create external dataset EmployeeDataset(EmployeeType)
using hdfs
-(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.rc"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
+(("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.rc"),
+("input-format"="rc-input-format"),
+("format"="hdfs-writable"),
+("parser"="hive-parser"),
+("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
create index EmployeeAgeIdx on EmployeeDataset(age);
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql
new file mode 100644
index 0000000..7c668e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Testing an external dataset with invalid adapter format parameter value
+ * Expected result: fail - Unknown data format.
+ */
+
+drop dataverse temp if exists;
+create dataverse temp
+use dataverse temp;
+
+create type test as closed {
+ id: int32
+};
+
+create external dataset testds (test) using localfs(
+("path"="asterix_nc1://data/csv/sample_04_quote_error.csv"),
+("format"="add"));
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql
new file mode 100644
index 0000000..438e0b6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/invalid-format/invalid-format.2.query.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Testing an external dataset with invalid adapter format parameter value
+ * Expected result: fail - Unknown data format.
+ */
+
+use dataverse temp;
+
+for $i in dataset testds
+return $i;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql
new file mode 100644
index 0000000..d7827c5
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a twitter feed with missing parameters
+ * Expected Res : Failure
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUser if not exists as open{
+screen_name: string,
+language: string,
+friends_count: int32,
+status_count: int32,
+name: string,
+followers_count: int32
+};
+
+create type Tweet if not exists as open{
+id: string,
+user: TwitterUser,
+latitude:double,
+longitude:double,
+created_at:string,
+message_text:string
+};
+
+create dataset Tweets (Tweet)
+primary key id;
+
+create feed TwitterFeed using push_twitter(
+("type-name"="Tweet"),
+("format"="twitter-status"),
+("consumer.key"="************"),
+("access.token"="**********"),
+("access.token.secret"="*************"));
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql
new file mode 100644
index 0000000..6712969
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/twitter-feed/twitter-feed.2.update.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a twitter feed with missing parameters
+ * Expected Res : Failure
+ */
+
+use dataverse feeds;
+connect feed TwitterFeed to dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
index 000ef5b..4e306b3 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
@@ -37,7 +37,7 @@
create external dataset EmployeeDataset(EmployeeType)
using adapter
(("reader"="hdfs"),
-("parser"="delimited-text"),
+("format"="delimited-text"),
("hdfs"="hdfs://127.0.0.1:31888"),
("path"="/asterix/large-record.txt"),
("input-format"="text-input-format"),
diff --git a/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp b/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
index b14f3cb..4736dee 100644
--- a/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
+++ b/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-indexing/rc-format/rc-format.1.ddl.sqlpp
@@ -37,7 +37,13 @@
age : int64
}
-create external table EmployeeDataset(EmployeeType) using "hdfs"(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.rc"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
+create external table EmployeeDataset(EmployeeType) using "hdfs"(
+("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.rc"),
+("input-format"="rc-input-format"),
+("format"="hdfs-writable"),
+("parser"="hive-parser"),
+("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
create index EmployeeAgeIdx on EmployeeDataset (age) type btree;
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 3a5140c..40ba82d 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,46 @@
ResultOffsetPath="results"
QueryOffsetPath="queries"
QueryFileExtension=".aql">
+ <test-group name="external">
+ <test-case FilePath="external">
+ <compilation-unit name="invalid-format">
+ <output-dir compare="Text">invalid-format</output-dir>
+ <expected-error>Unknown format</expected-error>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="external-indexing">
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="text-format">
+ <output-dir compare="Text">text-format</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="sequence-format">
+ <output-dir compare="Text">sequence-format</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="rc-format">
+ <output-dir compare="Text">rc-format</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="rtree-index">
+ <output-dir compare="Text">rtree-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="leftouterjoin">
+ <output-dir compare="Text">leftouterjoin</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-indexing">
+ <compilation-unit name="leftouterjoin-rtree">
+ <output-dir compare="Text">leftouterjoin-rtree</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
<test-group name="external-library">
<test-case FilePath="external-library">
<compilation-unit name="typed_adapter">
@@ -52,6 +92,12 @@
</test-group>
<test-group name="feeds">
<test-case FilePath="feeds">
+ <compilation-unit name="twitter-feed">
+ <output-dir compare="Text">twitter-feed</output-dir>
+ <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="feed-with-external-parser">
<output-dir compare="Text">feed-with-external-parser</output-dir>
</compilation-unit>
@@ -6256,13 +6302,13 @@
<test-case FilePath="load">
<compilation-unit name="issue14_query">
<output-dir compare="Text">issue14_query</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+ <expected-error>The parameter format must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
<compilation-unit name="issue315_query">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+ <expected-error>The parameter format must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
@@ -6390,38 +6436,6 @@
</compilation-unit>
</test-case>
</test-group>
- <test-group name="external-indexing">
- <test-case FilePath="external-indexing">
- <compilation-unit name="text-format">
- <output-dir compare="Text">text-format</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="external-indexing">
- <compilation-unit name="sequence-format">
- <output-dir compare="Text">sequence-format</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="external-indexing">
- <compilation-unit name="rc-format">
- <output-dir compare="Text">rc-format</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="external-indexing">
- <compilation-unit name="rtree-index">
- <output-dir compare="Text">rtree-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="external-indexing">
- <compilation-unit name="leftouterjoin">
- <output-dir compare="Text">leftouterjoin</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="external-indexing">
- <compilation-unit name="leftouterjoin-rtree">
- <output-dir compare="Text">leftouterjoin-rtree</output-dir>
- </compilation-unit>
- </test-case>
- </test-group>
<test-group name="temporal">
<test-case FilePath="temporal">
<compilation-unit name="overlap_bins_gby_3">
diff --git a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c343570..f5bb7ce 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6233,13 +6233,13 @@
<test-case FilePath="load">
<compilation-unit name="issue14_query">
<output-dir compare="Text">issue14_query</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+ <expected-error>The parameter format must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
<compilation-unit name="issue315_query">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
+ <expected-error>The parameter format must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index a03ad1a..041f706 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -120,9 +120,14 @@
this.metaType = metaType;
this.configuration = configuration;
dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-
dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
- prepare();
+ if (dataSourceFactory.isIndexible() && (files != null)) {
+ ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
+ }
+ dataSourceFactory.configure(configuration);
+ dataParserFactory.setRecordType(recordType);
+ dataParserFactory.setMetaType(metaType);
+ dataParserFactory.configure(configuration);
ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
configureFeedLogManager();
nullifyExternalObjects();
@@ -145,16 +150,6 @@
}
}
- private void prepare() throws AsterixException {
- if (dataSourceFactory.isIndexible() && (files != null)) {
- ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
- }
- dataSourceFactory.configure(configuration);
- dataParserFactory.setRecordType(recordType);
- dataParserFactory.setMetaType(metaType);
- dataParserFactory.configure(configuration);
- }
-
@Override
public ARecordType getAdapterOutputType() {
return recordType;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index de185e0..529977a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -25,15 +25,20 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.io.Writable;
@@ -48,8 +53,7 @@
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
-public class HDFSDataSourceFactory
- implements IInputStreamFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
+public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
protected static final long serialVersionUID = 1L;
protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@@ -69,6 +73,7 @@
private JobConf conf;
private InputSplit[] inputSplits;
private String nodeName;
+ private Format format;
@Override
public void configure(Map<String, String> configuration) throws AsterixException {
@@ -94,10 +99,14 @@
inputSplitsFactory = new InputSplitsFactory(inputSplits);
read = new boolean[readSchedule.length];
Arrays.fill(read, false);
- if (!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)) {
+ String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
this.recordClass = reader.createValue().getClass();
reader.close();
+ } else {
+ format = StreamRecordReaderProvider.getReaderFormat(configuration);
+ this.recordClass = char[].class;
}
} catch (IOException e) {
throw new AsterixException(e);
@@ -117,8 +126,8 @@
* 1. when target files are not null, it generates a file aware input stream that validate
* against the files
* 2. if the data is binary, it returns a generic reader */
- @Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer)
+ throws HyracksDataException {
try {
if (!configured) {
conf = confFactory.getConf();
@@ -126,7 +135,7 @@
nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
configured = true;
}
- return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files);
+ return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -170,15 +179,34 @@
return ExternalDataUtils.getDataSourceType(configuration);
}
+ /**
+ * HDFS Datasource is a special case in two ways:
+ * 1. It supports indexing.
+ * 2. It returns input as a set of writable object that we sometimes internally transform into a byte stream
+ * Hence, it can produce:
+ * 1. StreamRecordReader: When we transform the input into a byte stream.
+ * 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
+ * 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
+ */
@Override
- public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
try {
+ IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
+ if (format != null) {
+ StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format,
+ createInputStream(ctx, partition, indexer), configuration);
+ if (indexer != null) {
+ return new IndexingStreamRecordReader(streamReader, indexer);
+ } else {
+ return streamReader;
+ }
+ }
JobConf conf = confFactory.getConf();
InputSplit[] inputSplits = inputSplitsFactory.getSplits();
String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
- files == null ? null : ExternalIndexerProvider.getIndexer(configuration));
+ indexer);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
new file mode 100644
index 0000000..2c2dd98
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+
+ private StreamRecordReader reader;
+ private IExternalIndexer indexer;
+
+ public IndexingStreamRecordReader(StreamRecordReader reader, IExternalIndexer indexer) {
+ this.reader = reader;
+ this.indexer = indexer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public IExternalIndexer getIndexer() {
+ return indexer;
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return reader.hasNext();
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ return reader.next();
+ }
+
+ @Override
+ public boolean stop() {
+ return reader.stop();
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ reader.setController(controller);
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) {
+ reader.setFeedLogManager(feedLogManager);
+ }
+
+ @Override
+ public List<ExternalFile> getSnapshot() {
+ return null;
+ }
+
+ @Override
+ public int getCurrentSplitIndex() {
+ return -1;
+ }
+
+ @Override
+ public RecordReader<?, ? extends Writable> getReader() {
+ return null;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return reader.handleException(th);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
deleted file mode 100644
index 2c82f47..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamFactory;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class AbstractStreamRecordReaderFactory<T>
- implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
-
- private static final long serialVersionUID = 1L;
- protected IInputStreamFactory inputStreamFactory;
- protected Map<String, String> configuration;
-
- public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
- IInputStreamFactory inputStreamFactory) {
- this.inputStreamFactory = inputStreamFactory;
- return this;
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.RECORDS;
- }
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
- return inputStreamFactory.getPartitionConstraint();
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws AsterixException {
- this.configuration = configuration;
- inputStreamFactory.configure(configuration);
- }
-
- @Override
- public boolean isIndexible() {
- return inputStreamFactory.isIndexible();
- }
-
- @Override
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
- ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
- }
-
- @Override
- public boolean isIndexingOp() {
- if (inputStreamFactory.isIndexible()) {
- return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
- }
- return false;
- }
-
- protected Pair<AsterixInputStream, IExternalIndexer> getStreamAndIndexer(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- AsterixInputStream inputStream = inputStreamFactory.createInputStream(ctx, partition);
- IExternalIndexer indexer = null;
- if (inputStreamFactory.isIndexible()) {
- if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
- indexer = ((IIndexingDatasource) inputStream).getIndexer();
- }
- }
- return new Pair<AsterixInputStream, IExternalIndexer>(inputStream, indexer);
- }
-}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 6964a82..aa0451a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -21,13 +21,12 @@
import java.io.IOException;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.util.ExternalDataConstants;
-public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
+public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
- public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
- super(inputStream, indexer);
+ public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
+ super(inputStream);
}
private boolean prevCharCR;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
deleted file mode 100644
index 063ed11..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class EmptyLineSeparatedRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- final Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
- return new EmptyLineSeparatedRecordReader(streamAndIndexer.first, streamAndIndexer.second);
- }
-
- @Override
- public Class<? extends char[]> getRecordClass() {
- return char[].class;
- }
-}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 3089295..8572fc7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -21,19 +21,17 @@
import java.io.IOException;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class LineRecordReader extends AbstractStreamRecordReader {
+public class LineRecordReader extends StreamRecordReader {
protected boolean prevCharCR;
protected int newlineLength;
protected int recordNumber = 0;
- public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final IExternalIndexer indexer)
- throws HyracksDataException {
- super(stream, indexer);
+ public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
+ super(stream);
try {
if (hasHeader) {
if (hasNext()) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
deleted file mode 100644
index 4d44001..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
- boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
- Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
- if (quoteString != null) {
- return new QuotedLineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second, quoteString);
- } else {
- return new LineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second);
- }
- }
-
- @Override
- public Class<? extends char[]> getRecordClass() {
- return char[].class;
- }
-
-}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index abd2952..515e0e5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -32,9 +32,9 @@
private boolean prevCharEscape;
private boolean inQuote;
- public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream,
- final IExternalIndexer indexer, final String quoteString) throws HyracksDataException {
- super(hasHeader, stream, indexer);
+ public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString)
+ throws HyracksDataException {
+ super(hasHeader, stream);
if ((quoteString == null) || (quoteString.length() != 1)) {
throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7339bfd..26ac3cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,13 +20,13 @@
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+public class SemiStructuredRecordReader extends StreamRecordReader {
private int depth;
private boolean prevCharEscape;
@@ -35,13 +35,13 @@
private char recordEnd;
private int recordNumber = 0;
- public SemiStructuredRecordReader(AsterixInputStream stream, IExternalIndexer indexer, String recStartString,
- String recEndString) throws AsterixException {
- super(stream, indexer);
+ public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString)
+ throws HyracksDataException {
+ super(stream);
// set record opening char
if (recStartString != null) {
if (recStartString.length() != 1) {
- throw new AsterixException(
+ throw new HyracksDataException(
ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
}
@@ -52,7 +52,7 @@
// set record ending char
if (recEndString != null) {
if (recEndString.length() != 1) {
- throw new AsterixException(
+ throw new HyracksDataException(
ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
}
@@ -67,7 +67,7 @@
}
@Override
- public boolean hasNext() throws Exception {
+ public boolean hasNext() throws IOException {
if (done) {
return false;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
deleted file mode 100644
index 0f50204..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
- try {
- return new SemiStructuredRecordReader(streamAndIndexer.first, streamAndIndexer.second,
- configuration.get(ExternalDataConstants.KEY_RECORD_START),
- configuration.get(ExternalDataConstants.KEY_RECORD_END));
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public Class<? extends char[]> getRecordClass() {
- return char[].class;
- }
-}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
similarity index 72%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 7d6c1f3..57ef3ae 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -19,35 +19,27 @@
package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
-import java.util.List;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
-public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+public abstract class StreamRecordReader implements IRecordReader<char[]> {
protected final AsterixInputStreamReader reader;
protected CharArrayRecord record;
protected char[] inputBuffer;
protected int bufferLength = 0;
protected int bufferPosn = 0;
- protected final IExternalIndexer indexer;
protected boolean done = false;
protected FeedLogManager feedLogManager;
- public AbstractStreamRecordReader(AsterixInputStream inputStream, IExternalIndexer indexer) {
+ public StreamRecordReader(AsterixInputStream inputStream) {
this.reader = new AsterixInputStreamReader(inputStream);
- this.indexer = indexer;
record = new CharArrayRecord();
inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
}
@@ -66,11 +58,6 @@
}
@Override
- public IExternalIndexer getIndexer() {
- return indexer;
- }
-
- @Override
public boolean stop() {
try {
reader.stop();
@@ -82,35 +69,20 @@
}
@Override
+ public abstract boolean hasNext() throws IOException;
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) {
+ reader.setFeedLogManager(feedLogManager);
+ }
+
+ @Override
public void setController(AbstractFeedDataFlowController controller) {
reader.setController(controller);
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- this.feedLogManager = feedLogManager;
- reader.setFeedLogManager(feedLogManager);
- }
-
- @Override
public boolean handleException(Throwable th) {
return reader.handleException(th);
}
-
- //TODO: Fix the following method since they don't fit
- //Already the fix is in another local branch
- @Override
- public List<ExternalFile> getSnapshot() {
- return null;
- }
-
- @Override
- public int getCurrentSplitIndex() {
- return -1;
- }
-
- @Override
- public RecordReader<?, Writable> getReader() {
- return null;
- }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
new file mode 100644
index 0000000..f743a3f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+ protected final IInputStreamFactory streamFactory;
+ protected Map<String, String> configuration;
+ protected Format format;
+
+ public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) {
+ this.streamFactory = inputStreamFactory;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+ return streamFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws AsterixException {
+ this.configuration = configuration;
+ streamFactory.configure(configuration);
+ format = StreamRecordReaderProvider.getReaderFormat(configuration);
+ }
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ return StreamRecordReaderProvider.createRecordReader(format, streamFactory.createInputStream(ctx, partition),
+ configuration);
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 7ca185f..541737a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -27,7 +27,6 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.TwitterUtil;
import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
@@ -71,10 +70,10 @@
builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
- builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
throw new AsterixException(builder.toString());
}
- if (ExternalDataUtils.isPull(configuration)) {
+ if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
pull = true;
if (configuration.get(SearchAPIConstants.QUERY) == null) {
throw new AsterixException(
@@ -95,14 +94,20 @@
+ DEFAULT_INTERVAL + ")");
}
}
- } else if (ExternalDataUtils.isPush(configuration)) {
- pull = false;
} else {
- throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
- + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
+ pull = false;
}
}
+ public static boolean isTwitterPull(Map<String, String> configuration) {
+ String reader = configuration.get(ExternalDataConstants.KEY_READER);
+ if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
+ || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public boolean isIndexible() {
return false;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 063b8fa..997c254 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -28,7 +28,6 @@
import org.apache.asterix.external.api.IIndexingDatasource;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -63,8 +62,8 @@
@SuppressWarnings("unchecked")
public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot)
- throws IOException, AsterixException {
+ JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot, IExternalIndexer indexer)
+ throws IOException, AsterixException {
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
@@ -74,15 +73,13 @@
this.reader = new EmptyRecordReader<Object, Text>();
this.snapshot = snapshot;
this.hdfs = FileSystem.get(conf);
+ this.indexer = indexer;
nextInputSplit();
this.value = new Text();
if (snapshot != null) {
- this.indexer = ExternalIndexerProvider.getIndexer(configuration);
if (currentSplitIndex < snapshot.size()) {
indexer.reset(this);
}
- } else {
- this.indexer = null;
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f8d64e0..0f24f91 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -29,11 +29,10 @@
import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -53,21 +52,18 @@
}
}
- public static IInputStreamFactory getInputStreamFactory(String streamSource,
- Map<String, String> configuration) throws AsterixException {
+ public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
+ throws AsterixException {
IInputStreamFactory streamSourceFactory;
if (ExternalDataUtils.isExternal(streamSource)) {
String dataverse = ExternalDataUtils.getDataverse(configuration);
streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
} else {
switch (streamSource) {
- case ExternalDataConstants.STREAM_HDFS:
- streamSourceFactory = new HDFSDataSourceFactory();
- break;
case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
streamSourceFactory = new LocalFSInputStreamFactory();
break;
- case ExternalDataConstants.STREAM_SOCKET:
+ case ExternalDataConstants.SOCKET:
case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
streamSourceFactory = new SocketServerInputStreamFactory();
break;
@@ -89,59 +85,29 @@
if (reader.equals(ExternalDataConstants.EXTERNAL)) {
return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
}
- String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
- IInputStreamFactory inputStreamFactory;
- switch (parser) {
- case ExternalDataConstants.FORMAT_ADM:
- case ExternalDataConstants.FORMAT_JSON:
- case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_LINE_SEPARATED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
- case ExternalDataConstants.FORMAT_CSV:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
- switch (reader) {
- case ExternalDataConstants.READER_KV:
- return new KVReaderFactory();
- case ExternalDataConstants.READER_KV_TEST:
- return new KVTestReaderFactory();
- }
- }
- String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
- if (format != null) {
- switch (format) {
- case ExternalDataConstants.FORMAT_ADM:
- case ExternalDataConstants.FORMAT_JSON:
- case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_LINE_SEPARATED:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new EmptyLineSeparatedRecordReaderFactory()
- .setInputStreamFactoryProvider(inputStreamFactory);
- case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
- case ExternalDataConstants.FORMAT_CSV:
- inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
- return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
- }
- }
switch (reader) {
- case ExternalDataConstants.READER_HDFS:
- return new HDFSDataSourceFactory();
- case ExternalDataConstants.READER_TWITTER_PULL:
- case ExternalDataConstants.READER_TWITTER_PUSH:
- return new TwitterRecordReaderFactory();
case ExternalDataConstants.READER_KV:
return new KVReaderFactory();
case ExternalDataConstants.READER_KV_TEST:
return new KVTestReaderFactory();
+ case ExternalDataConstants.READER_HDFS:
+ return new HDFSDataSourceFactory();
+ case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
+ return new StreamRecordReaderFactory(new LocalFSInputStreamFactory());
+ case ExternalDataConstants.READER_TWITTER_PULL:
+ case ExternalDataConstants.READER_TWITTER_PUSH:
+ case ExternalDataConstants.READER_PUSH_TWITTER:
+ case ExternalDataConstants.READER_PULL_TWITTER:
+ return new TwitterRecordReaderFactory();
case ExternalDataConstants.TEST_RECORD_WITH_PK:
return new RecordWithPKTestReaderFactory();
+ case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
+ return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
+ case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+ case ExternalDataConstants.SOCKET:
+ return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());
+ case ExternalDataConstants.STREAM_SOCKET_CLIENT:
+ return new StreamRecordReaderFactory(new SocketClientInputStreamFactory());
default:
throw new AsterixException("unknown record reader factory: " + reader);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 06928b3..682fb89 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -70,7 +70,7 @@
case ExternalDataConstants.TEST_RECORD_WITH_PK:
return new TestRecordWithPKParserFactory();
default:
- throw new AsterixException("Unknown parser " + parser);
+ throw new AsterixException("Unknown format: " + parser);
}
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
new file mode 100644
index 0000000..ea8bc98
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamRecordReaderProvider {
+ public enum Format {
+ SEMISTRUCTURED,
+ CSV,
+ LINE_SEPARATED
+ }
+
+ public static Format getReaderFormat(Map<String, String> configuration) throws AsterixException {
+ String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (format != null) {
+ switch (format) {
+ case ExternalDataConstants.FORMAT_ADM:
+ case ExternalDataConstants.FORMAT_JSON:
+ case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
+ return Format.SEMISTRUCTURED;
+ case ExternalDataConstants.FORMAT_LINE_SEPARATED:
+ return Format.LINE_SEPARATED;
+ case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
+ case ExternalDataConstants.FORMAT_CSV:
+ return Format.CSV;
+ }
+ throw new AsterixException("Unknown format: " + format);
+ }
+ throw new AsterixException("Unspecified paramter: " + ExternalDataConstants.KEY_FORMAT);
+ }
+
+ public static StreamRecordReader createRecordReader(Format format, AsterixInputStream inputStream,
+ Map<String, String> configuration) throws HyracksDataException {
+ switch (format) {
+ case CSV:
+ String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+ if (quoteString != null) {
+ return new QuotedLineRecordReader(hasHeader, inputStream, quoteString);
+ } else {
+ return new LineRecordReader(hasHeader, inputStream);
+ }
+ case LINE_SEPARATED:
+ return new EmptyLineSeparatedRecordReader(inputStream);
+ case SEMISTRUCTURED:
+ return new SemiStructuredRecordReader(inputStream,
+ configuration.get(ExternalDataConstants.KEY_RECORD_START),
+ configuration.get(ExternalDataConstants.KEY_RECORD_END));
+ default:
+ throw new HyracksDataException("Unknown format: " + format);
+ }
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a02152b..b5ec27a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -72,8 +72,6 @@
public static final String KEY_HIVE_SERDE = "hive-serde";
public static final String KEY_RSS_URL = "url";
public static final String KEY_INTERVAL = "interval";
- public static final String KEY_PULL = "pull";
- public static final String KEY_PUSH = "push";
public static final String KEY_IS_FEED = "is-feed";
public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
public static final String KEY_FEED_NAME = "feed";
@@ -123,8 +121,10 @@
*/
public static final String READER_HDFS = "hdfs";
public static final String READER_KV = "key-value";
- public static final String READER_TWITTER_PUSH = "twitter-push";
- public static final String READER_TWITTER_PULL = "twitter-pull";
+ public static final String READER_TWITTER_PUSH = "twitter_push";
+ public static final String READER_PUSH_TWITTER = "push_twitter";
+ public static final String READER_TWITTER_PULL = "twitter_pull";
+ public static final String READER_PULL_TWITTER = "pull_twitter";
public static final String CLUSTER_LOCATIONS = "cluster-locations";
public static final String SCHEDULER = "hdfs-scheduler";
@@ -156,7 +156,7 @@
*/
public static final String STREAM_HDFS = "hdfs";
public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
- public static final String STREAM_SOCKET = "socket";
+ public static final String SOCKET = "socket";
public static final String STREAM_SOCKET_CLIENT = "socket-client";
/**
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 42fe8bf..76898c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -84,9 +84,9 @@
if (reader == null) {
throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
}
- String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
+ String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (parser == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_PARSER + " must be specified.");
+ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified.");
}
}
@@ -200,22 +200,6 @@
return false;
}
- public static boolean isPull(Map<String, String> configuration) {
- String pull = configuration.get(ExternalDataConstants.KEY_PULL);
- if (pull == null) {
- return false;
- }
- return Boolean.parseBoolean(pull);
- }
-
- public static boolean isPush(Map<String, String> configuration) {
- String push = configuration.get(ExternalDataConstants.KEY_PUSH);
- if (push == null) {
- return false;
- }
- return Boolean.parseBoolean(push);
- }
-
public static IRecordReaderFactory<?> createExternalRecordReaderFactory(Map<String, String> configuration)
throws AsterixException {
String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index d822310..354aedb 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -73,7 +73,7 @@
new FileSplit[] { new FileSplit("",
new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) },
null, null, 0, null, false);
- SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, null, "[", "]");
+ SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
Value val = new Value(objectPool);
while (recordReader.hasNext()) {
val.reset();
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 851a7e0..fc6e725 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -86,7 +86,7 @@
null, null, 0, null, false);
// create reader record reader
- QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream, null,
+ QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream,
ExternalDataConstants.DEFAULT_QUOTE);
// create csv with json record reader
CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
diff --git a/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql b/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
index 7aa1129..e6b7e21 100644
--- a/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
+++ b/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
@@ -36,7 +36,11 @@
create external dataset EmployeeDataset(EmployeeType)
using hdfs
-(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.txt"),("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"));
+(("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.txt"),
+("input-format"="text-input-format"),
+("format"="delimited-text"),
+("delimiter"="|"));
create index EmployeeAgeIdx on EmployeeDataset(age);