Merge branch 'zheilbron/asterix_msr_demo' of https://code.google.com/p/asterixdb into zheilbron/asterix_msr_demo
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index ec8b6be..3ffc1cc 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -29,6 +29,7 @@
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -37,6 +38,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
public class ConnectFeedStatement implements Statement {
@@ -89,11 +91,12 @@
}
}
- org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
try {
factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
- adapterOutputType = factoryOutput.getRight().getTypeName();
+ adapterOutputType = factoryOutput.second.getTypeName();
} catch (AlgebricksException ae) {
+ ae.printStackTrace();
throw new MetadataException(ae);
}
diff --git a/asterix-external-data/src/main/resources/schema/library.xsd b/asterix-external-data/src/main/resources/schema/library.xsd
index f58175a..00f71f5 100644
--- a/asterix-external-data/src/main/resources/schema/library.xsd
+++ b/asterix-external-data/src/main/resources/schema/library.xsd
@@ -9,9 +9,7 @@
<xs:element name="return_type" type="xs:string" />
<xs:element name="function_type" type="xs:string" />
<xs:element name="definition" type="xs:string" />
-
<xs:element name="factory_class" type="xs:string" />
- <xs:element name="adaptor_type" type="xs:string" />
<!-- definition of complex elements -->
@@ -40,7 +38,6 @@
<xs:sequence>
<xs:element ref="lib:name" />
<xs:element ref="lib:factory_class" />
- <xs:element ref="lib:adaptor_type" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -58,7 +55,7 @@
<xs:sequence>
<xs:element ref="lib:language" />
<xs:element ref="lib:libraryFunctions" minOccurs="0" />
- <xs:element ref="lib:libraryAdapters" minOccurs="0" />
+ <xs:element ref="lib:libraryAdapters" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
new file mode 100644
index 0000000..cb0142a
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptor extends StreamBasedAdapter implements IFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PipedOutputStream pos;
+
+ private final PipedInputStream pis;
+
+ private final Map<String, String> configuration;
+
+ private DummyGenerator generator;
+
+ public TestTypedAdaptor(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
+ Map<String, String> configuration) throws IOException {
+ super(parserFactory, sourceDatatype, ctx);
+ pos = new PipedOutputStream();
+ pis = new PipedInputStream(pos);
+ this.configuration = configuration;
+ }
+
+ @Override
+ public InputStream getInputStream(int partition) throws IOException {
+ return pis;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter frameWriter) throws Exception {
+ generator = new DummyGenerator(configuration, pos);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(generator);
+ super.start(partition, frameWriter);
+ }
+
+ private static class DummyGenerator implements Runnable {
+
+ private final int nOutputRecords;
+ private final OutputStream os;
+ private final byte[] EOL = "\n".getBytes();
+ private boolean continueIngestion;
+
+ public DummyGenerator(Map<String, String> configuration, OutputStream os) {
+ nOutputRecords = Integer.parseInt(configuration.get(TestTypedAdaptorFactory.KEY_NUM_OUTPUT_RECORDS));
+ this.os = os;
+ this.continueIngestion = true;
+ }
+
+ @Override
+ public void run() {
+ DummyRecord dummyRecord = new DummyRecord();
+ try {
+ int i = 0;
+ while (continueIngestion && i < nOutputRecords) {
+ dummyRecord.reset(i + 1, "" + (i + 1));
+ os.write(dummyRecord.toString().getBytes());
+ os.write(EOL);
+ i++;
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } finally {
+ try {
+ os.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void stop() {
+ continueIngestion = false;
+ }
+ }
+
+ private static class DummyRecord {
+
+ private int tweetid = 0;
+ private String text = null;
+
+ public void reset(int tweetid, String text) {
+ this.tweetid = tweetid;
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return "{" + "\"tweetid\":" + "int64(" + "\"" + tweetid + "\"" + ")" + "," + "\"message-text\":" + "\""
+ + text + "\"" + "}";
+ }
+
+ }
+
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ generator.stop();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
new file mode 100644
index 0000000..39f7ab2
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptorFactory implements ITypedAdapterFactory {
+
+ public static final String NAME = "test_typed_adaptor";
+
+ private static ARecordType adapterOutputType = initOutputType();
+
+ public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
+
+ private Map<String, String> configuration;
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ private static ARecordType initOutputType() {
+ String[] fieldNames = new String[] { "tweetid", "message-text" };
+ IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
+ ARecordType outputType = null;
+ try {
+ outputType = new ARecordType("TestTypedAdaptorOutputType", fieldNames, fieldTypes, false);
+ } catch (AsterixException exception) {
+ throw new IllegalStateException("Unable to create output type for adaptor " + NAME);
+ }
+ return outputType;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
+ return new TestTypedAdaptor(tupleParserFactory, adapterOutputType, ctx, configuration);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return adapterOutputType;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ }
+
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bd485a4..a0b7bf9 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -50,4 +50,10 @@
</definition>
</libraryFunction>
</libraryFunctions>
+ <libraryAdapters>
+ <libraryAdapter>
+ <name>test_typed_adaptor</name>
+ <factory_class>edu.uci.ics.asterix.external.library.adaptor.TestTypedAdaptorFactory</factory_class>
+ </libraryAdapter>
+ </libraryAdapters>
</externalLibrary>
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
new file mode 100644
index 0000000..f5fe458
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TestTypedAdaptorOutputType as closed {
+ tweetid: int64,
+ message-text: string
+}
+
+create dataset TweetsTestAdaptor(TestTypedAdaptorOutputType)
+primary key tweetid;
+
+create feed TestTypedAdaptorFeed
+using "testlib#test_typed_adaptor" (("num_output_records"="5"),("type-name"="TestTypedAdaptorOutputType"));
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
new file mode 100644
index 0000000..a26a148
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TestTypedAdaptorFeed to dataset TweetsTestAdaptor;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
new file mode 100644
index 0000000..733b5a0
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset TweetsTestAdaptor
+order by $x.tweetid
+return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
index 374e1b3..43ff18b 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
@@ -27,9 +27,9 @@
}
create feed TweetFeed
-using "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory"
+using file_feed
(("type-name"="TweetInputType"),("fs"="localfs"),("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
apply function testlib#parseTweet;
-create dataset Tweets(TweetOutputType)
+create dataset TweetsFeedIngest(TweetOutputType)
primary key id;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
index 8dfa98d..7414bba 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -11,4 +11,4 @@
set wait-for-completion-feed "true";
-connect feed TweetFeed to dataset Tweets;
+connect feed TweetFeed to dataset TweetsFeedIngest;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
index 5f74687..7d838be 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
@@ -9,5 +9,5 @@
*/
use dataverse externallibtest;
-for $x in dataset Tweets
+for $x in dataset TweetsFeedIngest
return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
new file mode 100644
index 0000000..2ad7b60
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1i64, "message-text": "1" }
+{ "tweetid": 2i64, "message-text": "2" }
+{ "tweetid": 3i64, "message-text": "3" }
+{ "tweetid": 4i64, "message-text": "4" }
+{ "tweetid": 5i64, "message-text": "5" }
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index 65780d9..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -45,5 +45,12 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="library-adapters">
+ <test-case FilePath="library-adapters">
+ <compilation-unit name="typed_adapter">
+ <output-dir compare="Text">typed_adapter</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index c6c1559..d29e2c5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -96,6 +98,7 @@
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -414,13 +417,14 @@
FeedDataSource feedDataSource = (FeedDataSource) dataSource;
FeedIntakeOperatorDescriptor feedIngestor = null;
- org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
AlgebricksPartitionConstraint constraint = null;
try {
factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
- IAdapterFactory adapterFactory = factoryOutput.getLeft();
- ARecordType adapterOutputType = factoryOutput.getRight();
+ IAdapterFactory adapterFactory = factoryOutput.first;
+ ARecordType adapterOutputType = factoryOutput.second;
+ AdapterType adapterType = factoryOutput.third;
ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
.getSerializerDeserializer(adapterOutputType);
@@ -432,12 +436,25 @@
throw new AlgebricksException("Feed not configured with a policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
- feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
- feedDesc, feedPolicy.getProperties());
-
- constraint = factoryOutput.getLeft().getPartitionConstraint();
+ switch (adapterType) {
+ case INTERNAL:
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+ feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+ .getFeedConnectionId().getDatasetName()), adapterFactory,
+ (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
+ break;
+ case EXTERNAL:
+ String libraryName = feedDataSource.getFeed().getAdaptorName().split("#")[0];
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
+ libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
+ .getAdaptorConfiguration(), (ARecordType) adapterOutputType, feedDesc,
+ feedPolicy.getProperties());
+ break;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
+ }
+ constraint = factoryOutput.first.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1024,7 +1041,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1150,7 +1168,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1241,7 +1260,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_RTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1427,6 +1447,9 @@
"edu.uci.ics.asterix.external.dataset.adapter..RSSFeedAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter",
"edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
+ adapterFactoryMapping.put("edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapter",
+ "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory");
+
return adapterFactoryMapping;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 859ed11..b732191 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,7 +45,7 @@
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
/** The type associated with the ADM data output from the feed adaptor */
- private final IAType outptuType;
+ private final IAType outputType;
/** unique identifier for a feed instance. */
private final FeedConnectionId feedId;
@@ -58,12 +59,39 @@
/** The (singleton) instance of IFeedManager **/
private IFeedManager feedManager;
+ /** The library that contains the adapter in use. **/
+ private String adapterLibraryName;
+
+ /**
+ * The adapter factory class that is used to create an instance of the feed adapter.
+ * This value is used only in the case of external adapters.
+ **/
+ private String adapterFactoryClassName;
+
+ /** The configuration parameters associated with the adapter. **/
+ private Map<String, String> adapterConfiguration;
+
+ private ARecordType adapterOutputType;
+
public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.adapterFactory = adapterFactory;
- this.outptuType = atype;
+ this.outputType = atype;
+ this.feedId = feedId;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
+ String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
+ RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapterFactoryClassName;
+ this.adapterConfiguration = configuration;
+ this.adapterLibraryName = adapterLibraryName;
+ this.outputType = atype;
this.feedId = feedId;
this.feedPolicy = feedPolicy;
}
@@ -80,7 +108,7 @@
try {
if (ingestionRuntime == null) {
// create an instance of a feed adaptor to ingest data.
- adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ adapter = createAdapter(ctx, partition);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
}
@@ -113,10 +141,68 @@
}
public IAType getOutputType() {
- return outptuType;
+ return outputType;
}
public RecordDescriptor getRecordDescriptor() {
return recordDescriptors[0];
}
+
+ private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ IFeedAdapter feedAdapter = null;
+ if (adapterFactory != null) {
+ feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ } else {
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+ adapterLibraryName);
+ if (classLoader != null) {
+ IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
+ .newInstance()));
+
+ switch (adapterFactory.getAdapterType()) {
+ case TYPED: {
+ ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
+ feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
+ partition);
+ }
+ break;
+ case GENERIC: {
+ String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+ if (outputTypeName == null) {
+ throw new IllegalArgumentException(
+ "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+ + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+ }
+ ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
+ (ARecordType) adapterOutputType);
+ ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
+ }
+ break;
+ }
+
+ feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ } else {
+ String message = "Unable to create adapter as class loader not configured for library "
+ + adapterLibraryName + " in dataverse " + feedId.getDataverse();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(message);
+ }
+ throw new IllegalArgumentException(message);
+
+ }
+ }
+ return feedAdapter;
+ }
+
+ public String getAdapterLibraryName() {
+ return adapterLibraryName;
+ }
+
+ public String getAdapterFactoryClassName() {
+ return adapterFactoryClassName;
+ }
+
+ public Map<String, String> getAdapterConfiguration() {
+ return adapterConfiguration;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 8f140d7..6e0ed83 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -34,6 +34,7 @@
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
@@ -41,6 +42,7 @@
import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -94,9 +96,15 @@
IOperatorDescriptor opDesc = entry.getValue();
if (opDesc instanceof FeedIntakeOperatorDescriptor) {
FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
- FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
- orig.getAdapterFactory(), (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(),
- orig.getFeedPolicy());
+ FeedIntakeOperatorDescriptor fiop;
+ if (orig.getAdapterFactory() != null) {
+ fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
+ (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+ } else {
+ fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
+ orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
+ (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+ }
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
@@ -221,7 +229,7 @@
}
- public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+ public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
String adapterName = null;
@@ -229,7 +237,7 @@
String adapterFactoryClassname = null;
IAdapterFactory adapterFactory = null;
ARecordType adapterOutputType = null;
- Pair<IAdapterFactory, ARecordType> feedProps = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
try {
adapterName = feed.getAdaptorName();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -254,8 +262,7 @@
}
} else {
adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname != null) {
- } else {
+ if (adapterFactoryClassname == null) {
adapterFactoryClassname = adapterName;
}
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
@@ -283,7 +290,8 @@
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
- feedProps = Pair.of(adapterFactory, adapterOutputType);
+ feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
+ adapterEntity.getType());
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to create adapter " + e);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 9c60779..299bbfb 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -26,17 +26,14 @@
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ABooleanTypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.ACircleTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ADateTimeTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ADateTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ADoubleTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.AFloatTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.AInt32TypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.AInt64TypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.ALineTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ANullTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.APointTypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.APolygonTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ARectangleTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.AStringTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ATimeTypeComputer;
@@ -637,12 +634,12 @@
addFunction(COUNT, AInt64TypeComputer.INSTANCE, true);
addPrivateFunction(COUNTHASHED_GRAM_TOKENS, OrderedListOfAInt32TypeComputer.INSTANCE, true);
addPrivateFunction(COUNTHASHED_WORD_TOKENS, OrderedListOfAInt32TypeComputer.INSTANCE, true);
- addFunction(CREATE_CIRCLE, ACircleTypeComputer.INSTANCE, true);
- addFunction(CREATE_LINE, ALineTypeComputer.INSTANCE, true);
- addPrivateFunction(CREATE_MBR, ADoubleTypeComputer.INSTANCE, true);
- addFunction(CREATE_POINT, APointTypeComputer.INSTANCE, true);
- addFunction(CREATE_POLYGON, APolygonTypeComputer.INSTANCE, true);
- addFunction(CREATE_RECTANGLE, ARectangleTypeComputer.INSTANCE, true);
+ addFunction(CREATE_CIRCLE, OptionalACircleTypeComputer.INSTANCE, true);
+ addFunction(CREATE_LINE, OptionalALineTypeComputer.INSTANCE, true);
+ addPrivateFunction(CREATE_MBR, OptionalADoubleTypeComputer.INSTANCE, true);
+ addFunction(CREATE_POINT, OptionalAPointTypeComputer.INSTANCE, true);
+ addFunction(CREATE_POLYGON, OptionalAPolygonTypeComputer.INSTANCE, true);
+ addFunction(CREATE_RECTANGLE, OptionalARectangleTypeComputer.INSTANCE, true);
addFunction(CREATE_UUID, AUUIDTypeComputer.INSTANCE, false);
addFunction(DATE_CONSTRUCTOR, OptionalADateTypeComputer.INSTANCE, true);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
index 7bf2668..7680c15 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
@@ -15,12 +15,7 @@
package edu.uci.ics.asterix.om.typecomputer.impl;
-import java.util.ArrayList;
-import java.util.List;
-
import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -46,29 +41,18 @@
if (f.getArguments().size() < 1) {
return BuiltinType.ANULL;
}
- List<IAType> possibleTypes = new ArrayList<IAType>();
+
+ TypeCompatibilityChecker tcc = new TypeCompatibilityChecker();
for (int i = 0; i < f.getArguments().size(); i++) {
ILogicalExpression arg = f.getArguments().get(i).getValue();
IAType type = (IAType) env.getType(arg);
- if (type.getTypeTag() == ATypeTag.UNION) {
- List<IAType> typeList = ((AUnionType) type).getUnionList();
- for (IAType t : typeList) {
- if (t.getTypeTag() != ATypeTag.NULL) {
- //CONCAT_NON_NULL cannot return null because it's only used for if-else construct
- if (!possibleTypes.contains(t))
- possibleTypes.add(t);
- }
- }
- } else {
- if (!possibleTypes.contains(type))
- possibleTypes.add(type);
- }
+ tcc.addPossibleType(type);
}
- if (possibleTypes.size() == 1) {
- return possibleTypes.get(0);
- } else {
+
+ IAType result = tcc.getCompatibleType();
+ if (result == null) {
throw new AlgebricksException("The two branches of the if-else clause should return the same type.");
}
+ return result;
}
-
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
index c1450ca..bd67f18 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
@@ -15,10 +15,7 @@
package edu.uci.ics.asterix.om.typecomputer.impl;
import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -40,46 +37,27 @@
if (fce.getArguments().size() < 3)
throw new AlgebricksException(errMsg1);
- IAType t0;
- IAType t1;
- IAType ti;
-
- ATypeTag tag0;
- ATypeTag tag1;
- ATypeTag tagi;
- try {
- t0 = (IAType) env.getType(fce.getArguments().get(0).getValue());
- t1 = (IAType) env.getType(fce.getArguments().get(2).getValue());
- tag0 = t0.getTypeTag();
- tag1 = t1.getTypeTag();
- if (t0.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t0))
- tag0 = ((AUnionType) t0).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
- .getTypeTag();
- if (t1.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t1))
- tag1 = ((AUnionType) t1).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
- .getTypeTag();
- for (int i = 2; i < fce.getArguments().size(); i += 2) {
- ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
- tagi = ti.getTypeTag();
- if (ti.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) ti))
- tagi = ((AUnionType) ti).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
- .getTypeTag();
- if (tag1 != tagi)
- if (!t1.toString().equals(ti.toString()))
- throw new AlgebricksException(errMsg2);
- }
- for (int i = 1; i < fce.getArguments().size(); i += 2) {
- ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
- tagi = ti.getTypeTag();
- if (ti.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) ti))
- tagi = ((AUnionType) ti).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
- .getTypeTag();
- if (tag0 != tagi)
- throw new AlgebricksException(errMsg3);
- }
- } catch (AlgebricksException e) {
- throw new AlgebricksException(e);
+ TypeCompatibilityChecker tcc = new TypeCompatibilityChecker();
+ for (int i = 2; i < fce.getArguments().size(); i += 2) {
+ IAType ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
+ tcc.addPossibleType(ti);
}
- return t1;
+ IAType valueType = tcc.getCompatibleType();
+ if (valueType == null) {
+ throw new AlgebricksException(errMsg2);
+ }
+
+ IAType switchType = (IAType) env.getType(fce.getArguments().get(0).getValue());
+ tcc.reset();
+ tcc.addPossibleType(switchType);
+ for (int i = 1; i < fce.getArguments().size(); i += 2) {
+ IAType ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
+ tcc.addPossibleType(ti);
+ }
+ IAType caseType = tcc.getCompatibleType();
+ if (caseType == null) {
+ throw new AlgebricksException(errMsg3);
+ }
+ return valueType;
}
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java
new file mode 100644
index 0000000..0739b2f
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+class TypeCompatibilityChecker {
+ private final List<IAType> possibleTypes;
+ private boolean nullEncountered;
+
+ public TypeCompatibilityChecker() {
+ possibleTypes = new ArrayList<IAType>();
+ nullEncountered = false;
+ }
+
+ public void reset() {
+ possibleTypes.clear();
+ nullEncountered = false;
+ }
+
+ public void addPossibleType(IAType type) {
+ if (type.getTypeTag() == ATypeTag.UNION) {
+ List<IAType> typeList = ((AUnionType) type).getUnionList();
+ for (IAType t : typeList) {
+ if (t.getTypeTag() != ATypeTag.NULL) {
+ //CONCAT_NON_NULL cannot return null because it's only used for if-else construct
+ if (!possibleTypes.contains(t))
+ possibleTypes.add(t);
+ } else {
+ nullEncountered = true;
+ }
+ }
+ } else {
+ if (type.getTypeTag() != ATypeTag.NULL) {
+ if (!possibleTypes.contains(type)) {
+ possibleTypes.add(type);
+ }
+ } else {
+ nullEncountered = true;
+ }
+ }
+ }
+
+ public IAType getCompatibleType() {
+ switch (possibleTypes.size()) {
+ case 0:
+ return BuiltinType.ANULL;
+ case 1:
+ if (nullEncountered) {
+ return AUnionType.createNullableType(possibleTypes.get(0));
+ } else {
+ return possibleTypes.get(0);
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index 9ce9b24..ffbbd64 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -15,6 +15,9 @@
package edu.uci.ics.asterix.om.types;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* There is a unique tag for each primitive type and for each kind of
* non-primitive type in the object model.
@@ -72,6 +75,20 @@
return value;
}
- public final static int TYPE_COUNT = ATypeTag.values().length;
+ public static final int TYPE_COUNT = ATypeTag.values().length;
+
+ public static final ATypeTag[] VALUE_TYPE_MAPPING;
+
+ static {
+ List<ATypeTag> typeList = new ArrayList<>();
+ for (ATypeTag tt : values()) {
+ int index = tt.value;
+ while (typeList.size() <= index) {
+ typeList.add(null);
+ }
+ typeList.set(index, tt);
+ }
+ VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
+ }
}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
index 08105a2..7cf588e 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
@@ -16,7 +16,16 @@
import java.io.DataOutput;
import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -32,13 +41,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class AStringConstructorDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new AStringConstructorDescriptor();
@@ -52,34 +60,109 @@
@Override
public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
+ try {
+ return new ICopyEvaluator() {
- private DataOutput out = output.getDataOutput();
- private ArrayBackedValueStorage outInput = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(outInput);
- private String errorMessage = "This can not be an instance of string";
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
+ private DataOutput out = output.getDataOutput();
+ private ArrayBackedValueStorage outInput = new ArrayBackedValueStorage();
+ private ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+ private PrintStream ps = new PrintStream(baaos, false, "UTF-8");
+ private ICopyEvaluator eval = args[0].createEvaluator(outInput);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
- @Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ try {
+ outInput.reset();
+ eval.evaluate(tuple);
+ byte[] serString = outInput.getByteArray();
- try {
- outInput.reset();
- eval.evaluate(tuple);
- byte[] serString = outInput.getByteArray();
- if (serString[0] == SER_STRING_TYPE_TAG) {
- out.write(outInput.getByteArray(), outInput.getStartOffset(), outInput.getLength());
- } else if (serString[0] == SER_NULL_TYPE_TAG)
- nullSerde.serialize(ANull.NULL, out);
- else
- throw new AlgebricksException(errorMessage);
- } catch (IOException e1) {
- throw new AlgebricksException(errorMessage);
+ ATypeTag tt = ATypeTag.VALUE_TYPE_MAPPING[serString[0]];
+ if (tt == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (tt == ATypeTag.STRING) {
+ out.write(outInput.getByteArray(), outInput.getStartOffset(), outInput.getLength());
+ } else {
+ baaos.write(0);
+ baaos.write(0);
+ switch (tt) {
+ case INT8: {
+ int i = AInt8SerializerDeserializer.getByte(outInput.getByteArray(), 1);
+ ps.print(i);
+ break;
+ }
+ case INT16: {
+ int i = AInt16SerializerDeserializer.getShort(outInput.getByteArray(), 1);
+ ps.print(i);
+ break;
+ }
+ case INT32: {
+ int i = AInt32SerializerDeserializer.getInt(outInput.getByteArray(), 1);
+ ps.print(i);
+ break;
+ }
+ case INT64: {
+ long l = AInt64SerializerDeserializer.getLong(outInput.getByteArray(), 1);
+ ps.print(l);
+ break;
+ }
+ case DOUBLE: {
+ double d = ADoubleSerializerDeserializer.getDouble(outInput.getByteArray(),
+ 1);
+ ps.print(d);
+ break;
+ }
+ case FLOAT: {
+ float f = AFloatSerializerDeserializer.getFloat(outInput.getByteArray(), 1);
+ ps.print(f);
+ break;
+ }
+ case BOOLEAN: {
+ boolean b = ABooleanSerializerDeserializer.getBoolean(
+ outInput.getByteArray(), 1);
+ ps.print(b);
+ break;
+ }
+
+ // NotYetImplemented
+ case CIRCLE:
+ case DATE:
+ case DATETIME:
+ case LINE:
+ case TIME:
+ case DURATION:
+ case YEARMONTHDURATION:
+ case DAYTIMEDURATION:
+ case INTERVAL:
+ case ORDEREDLIST:
+ case POINT:
+ case POINT3D:
+ case RECTANGLE:
+ case POLYGON:
+ case RECORD:
+ case UNORDEREDLIST:
+ case UUID:
+ default:
+ throw new AlgebricksException("string of " + tt + " not supported");
+ }
+ ps.flush();
+ byte[] tmpStrBytes = baaos.getByteArray();
+ int utfLen = baaos.size() - 2;
+ tmpStrBytes[0] = (byte) ((utfLen >>> 8) & 0xFF);
+ tmpStrBytes[1] = (byte) ((utfLen >>> 0) & 0xFF);
+ out.write(ATypeTag.STRING.serialize());
+ out.write(tmpStrBytes);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
}
- }
- };
+ };
+ } catch (UnsupportedEncodingException e) {
+ throw new AlgebricksException(e);
+ }
}
};
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 41b1915..8f24dec 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
/*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from