added test for installation and use of an external adaptor
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index ec8b6be..76aae15 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -29,6 +29,7 @@
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -37,6 +38,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
public class ConnectFeedStatement implements Statement {
@@ -89,10 +91,10 @@
}
}
- org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
try {
factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
- adapterOutputType = factoryOutput.getRight().getTypeName();
+ adapterOutputType = factoryOutput.second.getTypeName();
} catch (AlgebricksException ae) {
throw new MetadataException(ae);
}
diff --git a/asterix-external-data/src/main/resources/schema/library.xsd b/asterix-external-data/src/main/resources/schema/library.xsd
index f58175a..00f71f5 100644
--- a/asterix-external-data/src/main/resources/schema/library.xsd
+++ b/asterix-external-data/src/main/resources/schema/library.xsd
@@ -9,9 +9,7 @@
<xs:element name="return_type" type="xs:string" />
<xs:element name="function_type" type="xs:string" />
<xs:element name="definition" type="xs:string" />
-
<xs:element name="factory_class" type="xs:string" />
- <xs:element name="adaptor_type" type="xs:string" />
<!-- definition of complex elements -->
@@ -40,7 +38,6 @@
<xs:sequence>
<xs:element ref="lib:name" />
<xs:element ref="lib:factory_class" />
- <xs:element ref="lib:adaptor_type" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -58,7 +55,7 @@
<xs:sequence>
<xs:element ref="lib:language" />
<xs:element ref="lib:libraryFunctions" minOccurs="0" />
- <xs:element ref="lib:libraryAdapters" minOccurs="0" />
+ <xs:element ref="lib:libraryAdapters" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
new file mode 100644
index 0000000..cb0142a
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptor extends StreamBasedAdapter implements IFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PipedOutputStream pos;
+
+ private final PipedInputStream pis;
+
+ private final Map<String, String> configuration;
+
+ private DummyGenerator generator;
+
+ public TestTypedAdaptor(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
+ Map<String, String> configuration) throws IOException {
+ super(parserFactory, sourceDatatype, ctx);
+ pos = new PipedOutputStream();
+ pis = new PipedInputStream(pos);
+ this.configuration = configuration;
+ }
+
+ @Override
+ public InputStream getInputStream(int partition) throws IOException {
+ return pis;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter frameWriter) throws Exception {
+ generator = new DummyGenerator(configuration, pos);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(generator);
+ super.start(partition, frameWriter);
+ }
+
+ private static class DummyGenerator implements Runnable {
+
+ private final int nOutputRecords;
+ private final OutputStream os;
+ private final byte[] EOL = "\n".getBytes();
+ private boolean continueIngestion;
+
+ public DummyGenerator(Map<String, String> configuration, OutputStream os) {
+ nOutputRecords = Integer.parseInt(configuration.get(TestTypedAdaptorFactory.KEY_NUM_OUTPUT_RECORDS));
+ this.os = os;
+ this.continueIngestion = true;
+ }
+
+ @Override
+ public void run() {
+ DummyRecord dummyRecord = new DummyRecord();
+ try {
+ int i = 0;
+ while (continueIngestion && i < nOutputRecords) {
+ dummyRecord.reset(i + 1, "" + (i + 1));
+ os.write(dummyRecord.toString().getBytes());
+ os.write(EOL);
+ i++;
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } finally {
+ try {
+ os.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void stop() {
+ continueIngestion = false;
+ }
+ }
+
+ private static class DummyRecord {
+
+ private int tweetid = 0;
+ private String text = null;
+
+ public void reset(int tweetid, String text) {
+ this.tweetid = tweetid;
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return "{" + "\"tweetid\":" + "int64(" + "\"" + tweetid + "\"" + ")" + "," + "\"message-text\":" + "\""
+ + text + "\"" + "}";
+ }
+
+ }
+
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ generator.stop();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
new file mode 100644
index 0000000..39f7ab2
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptorFactory implements ITypedAdapterFactory {
+
+ public static final String NAME = "test_typed_adaptor";
+
+ private static ARecordType adapterOutputType = initOutputType();
+
+ public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
+
+ private Map<String, String> configuration;
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ private static ARecordType initOutputType() {
+ String[] fieldNames = new String[] { "tweetid", "message-text" };
+ IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
+ ARecordType outputType = null;
+ try {
+ outputType = new ARecordType("TestTypedAdaptorOutputType", fieldNames, fieldTypes, false);
+ } catch (AsterixException exception) {
+ throw new IllegalStateException("Unable to create output type for adaptor " + NAME);
+ }
+ return outputType;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
+ return new TestTypedAdaptor(tupleParserFactory, adapterOutputType, ctx, configuration);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return adapterOutputType;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ }
+
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bd485a4..a0b7bf9 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -50,4 +50,10 @@
</definition>
</libraryFunction>
</libraryFunctions>
+ <libraryAdapters>
+ <libraryAdapter>
+ <name>test_typed_adaptor</name>
+ <factory_class>edu.uci.ics.asterix.external.library.adaptor.TestTypedAdaptorFactory</factory_class>
+ </libraryAdapter>
+ </libraryAdapters>
</externalLibrary>
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
index a6be981..9ef80ea 100644
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -59,7 +59,9 @@
@Test
public void test() throws Exception {
for (TestCaseContext testCaseCtx : testCaseCollection) {
- TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
+ if (testCaseCtx.getTestCase().getCompilationUnit().get(0).getName().contains("adapt")) {
+ TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
+ }
}
}
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
new file mode 100644
index 0000000..0c2df82
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TestTypedAdaptorOutputType as closed {
+ tweetid: int64,
+ message-text: string
+}
+
+create dataset Tweets(TestTypedAdaptorOutputType)
+primary key tweetid;
+
+create feed TestTypedAdaptorFeed
+using "externallibtest#test_typed_adaptor" (("num_output_records"="5"),("type-name"="TestTypedAdaptorOutputType"));
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
new file mode 100644
index 0000000..eb1bc9a
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TestTypedAdaptorFeed to dataset Tweets;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
new file mode 100644
index 0000000..9729a84
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
new file mode 100644
index 0000000..2ad7b60
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1i64, "message-text": "1" }
+{ "tweetid": 2i64, "message-text": "2" }
+{ "tweetid": 3i64, "message-text": "3" }
+{ "tweetid": 4i64, "message-text": "4" }
+{ "tweetid": 5i64, "message-text": "5" }
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index 65780d9..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -45,5 +45,12 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="library-adapters">
+ <test-case FilePath="library-adapters">
+ <compilation-unit name="typed_adapter">
+ <output-dir compare="Text">typed_adapter</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1dc220a..5a59af6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -96,6 +98,7 @@
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -414,13 +417,14 @@
FeedDataSource feedDataSource = (FeedDataSource) dataSource;
FeedIntakeOperatorDescriptor feedIngestor = null;
- org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
AlgebricksPartitionConstraint constraint = null;
try {
factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
- IAdapterFactory adapterFactory = factoryOutput.getLeft();
- ARecordType adapterOutputType = factoryOutput.getRight();
+ IAdapterFactory adapterFactory = factoryOutput.first;
+ ARecordType adapterOutputType = factoryOutput.second;
+ AdapterType adapterType = factoryOutput.third;
ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
.getSerializerDeserializer(adapterOutputType);
@@ -432,12 +436,25 @@
throw new AlgebricksException("Feed not configured with a policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
- feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
- feedDesc, feedPolicy.getProperties());
-
- constraint = factoryOutput.getLeft().getPartitionConstraint();
+ switch (adapterType) {
+ case INTERNAL:
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+ feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+ .getFeedConnectionId().getDatasetName()), adapterFactory,
+ (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
+ break;
+ case EXTERNAL:
+ String libraryName = feedDataSource.getFeed().getAdaptorName().split("#")[0];
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
+ libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
+ .getAdaptorConfiguration(), (ARecordType) adapterOutputType, feedDesc,
+ feedPolicy.getProperties());
+ break;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
+ }
+ constraint = factoryOutput.first.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1029,7 +1046,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1155,7 +1173,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1246,7 +1265,8 @@
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+ jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_RTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 859ed11..b732191 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,7 +45,7 @@
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
/** The type associated with the ADM data output from the feed adaptor */
- private final IAType outptuType;
+ private final IAType outputType;
/** unique identifier for a feed instance. */
private final FeedConnectionId feedId;
@@ -58,12 +59,39 @@
/** The (singleton) instance of IFeedManager **/
private IFeedManager feedManager;
+ /** The library that contains the adapter in use. **/
+ private String adapterLibraryName;
+
+ /**
+ * The adapter factory class that is used to create an instance of the feed adapter.
+ * This value is used only in the case of external adapters.
+ **/
+ private String adapterFactoryClassName;
+
+ /** The configuration parameters associated with the adapter. **/
+ private Map<String, String> adapterConfiguration;
+
+ private ARecordType adapterOutputType;
+
public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.adapterFactory = adapterFactory;
- this.outptuType = atype;
+ this.outputType = atype;
+ this.feedId = feedId;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
+ String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
+ RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapterFactoryClassName;
+ this.adapterConfiguration = configuration;
+ this.adapterLibraryName = adapterLibraryName;
+ this.outputType = atype;
this.feedId = feedId;
this.feedPolicy = feedPolicy;
}
@@ -80,7 +108,7 @@
try {
if (ingestionRuntime == null) {
// create an instance of a feed adaptor to ingest data.
- adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ adapter = createAdapter(ctx, partition);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
}
@@ -113,10 +141,68 @@
}
public IAType getOutputType() {
- return outptuType;
+ return outputType;
}
public RecordDescriptor getRecordDescriptor() {
return recordDescriptors[0];
}
+
+ private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ IFeedAdapter feedAdapter = null;
+ if (adapterFactory != null) {
+ feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ } else {
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+ adapterLibraryName);
+ if (classLoader != null) {
+ IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
+ .newInstance()));
+
+ switch (adapterFactory.getAdapterType()) {
+ case TYPED: {
+ ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
+ feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
+ partition);
+ }
+ break;
+ case GENERIC: {
+ String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+ if (outputTypeName == null) {
+ throw new IllegalArgumentException(
+ "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+ + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+ }
+ ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
+ (ARecordType) adapterOutputType);
+ ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
+ }
+ break;
+ }
+
+ feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ } else {
+ String message = "Unable to create adapter as class loader not configured for library "
+ + adapterLibraryName + " in dataverse " + feedId.getDataverse();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(message);
+ }
+ throw new IllegalArgumentException(message);
+
+ }
+ }
+ return feedAdapter;
+ }
+
+ public String getAdapterLibraryName() {
+ return adapterLibraryName;
+ }
+
+ public String getAdapterFactoryClassName() {
+ return adapterFactoryClassName;
+ }
+
+ public Map<String, String> getAdapterConfiguration() {
+ return adapterConfiguration;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index e3d5bed..535c6d8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -38,6 +38,7 @@
import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -79,9 +80,15 @@
IOperatorDescriptor opDesc = entry.getValue();
if (opDesc instanceof FeedIntakeOperatorDescriptor) {
FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
- FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
- orig.getAdapterFactory(), (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(),
- orig.getFeedPolicy());
+ FeedIntakeOperatorDescriptor fiop;
+ if (orig.getAdapterFactory() != null) {
+ fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
+ (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+ } else {
+ fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
+ orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
+ (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+ }
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
@@ -196,7 +203,7 @@
}
- public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+ public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
String adapterName = null;
@@ -204,7 +211,7 @@
String adapterFactoryClassname = null;
IAdapterFactory adapterFactory = null;
ARecordType adapterOutputType = null;
- Pair<IAdapterFactory, ARecordType> feedProps = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
try {
adapterName = feed.getAdaptorName();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -258,7 +265,7 @@
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
- feedProps = Pair.of(adapterFactory, adapterOutputType);
+ feedProps = new Triple(adapterFactory, adapterOutputType, adapterEntity.getType());
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to create adapter " + e);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 41b1915..8f24dec 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
/*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from