fixed an issue related to correct evaluation of type associated with the output from feed adaptor
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 3ee53a6..10f5c9e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,6 +32,10 @@
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
public class ConnectFeedStatement implements Statement {
@@ -71,62 +75,62 @@
public void initialize(MetadataTransactionContext mdTxnCtx, Dataset targetDataset, Feed sourceFeed)
throws MetadataException {
- query = new Query();
- FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
- Function function = null;
- String adaptorOutputType = null;
- if (appliedFunction != null) {
- function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
- if (function == null) {
- throw new MetadataException(" Unknown function " + function);
- } else {
- if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
- adaptorOutputType = targetDataset.getItemTypeName();
- } else {
- if (function.getParams().size() > 1) {
- throw new MetadataException(" Incompatible function: " + appliedFunction
- + " Number if arguments must be 1");
- }
- adaptorOutputType = function.getParams().get(0);
- }
- }
- } else {
- adaptorOutputType = targetDataset.getItemTypeName();
- }
- StringBuilder builder = new StringBuilder();
- builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
- builder.append("insert into dataset " + datasetName + " ");
+ query = new Query();
+ FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
+ Function function = null;
+ String adapterOutputType = null;
+ if (appliedFunction != null) {
+ function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+ if (function == null) {
+ throw new MetadataException(" Unknown function " + function);
+ } else if (function.getParams().size() > 1) {
+ throw new MetadataException(" Incompatible function: " + appliedFunction
+ + " Number if arguments must be 1");
+ }
+ }
- if (appliedFunction == null) {
- builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType + "'"
- + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
- builder.append(" return $x");
- } else {
- if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
- String param = function.getParams().get(0);
- builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
- + adaptorOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
- builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
- } else {
- builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType
- + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
- builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
- + ")");
- builder.append(" return $y");
- }
+ org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ try {
+ factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+ adapterOutputType = factoryOutput.getRight().getTypeName();
+ } catch (AlgebricksException ae) {
+ throw new MetadataException(ae);
+ }
- }
- builder.append(")");
- builder.append(";");
- AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+ StringBuilder builder = new StringBuilder();
+ builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
+ builder.append("insert into dataset " + datasetName + " ");
- List<Statement> statements;
- try {
- statements = parser.Statement();
- query = ((InsertStatement) statements.get(1)).getQuery();
- } catch (ParseException pe) {
- throw new MetadataException(pe);
- }
+ if (appliedFunction == null) {
+ builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType + "'"
+ + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+ builder.append(" return $x");
+ } else {
+ if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
+ String param = function.getParams().get(0);
+ builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
+ + adapterOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+ builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
+ } else {
+ builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType
+ + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+ builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
+ + ")");
+ builder.append(" return $y");
+ }
+
+ }
+ builder.append(")");
+ builder.append(";");
+ AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+
+ List<Statement> statements;
+ try {
+ statements = parser.Statement();
+ query = ((InsertStatement) statements.get(1)).getQuery();
+ } catch (ParseException pe) {
+ throw new MetadataException(pe);
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 89368e7..b961158 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -60,6 +60,7 @@
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
@@ -169,7 +170,7 @@
private final AsterixStorageProperties storageProperties;
- private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+ public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -412,65 +413,30 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
- DatasourceAdapter adapterEntity;
- IAdapterFactory adapterFactory;
- IAType adapterOutputType;
- String adapterName;
- String adapterFactoryClassname;
FeedDataSource feedDataSource = (FeedDataSource) dataSource;
- try {
- adapterName = feedDataSource.getFeed().getAdaptorName();
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname != null) {
- } else {
- adapterFactoryClassname = adapterName;
- }
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
-
- Map<String, String> configuration = feedDataSource.getFeed().getAdaptorConfiguration();
-
- switch (adapterFactory.getAdapterType()) {
- case TYPED:
- adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
- ((ITypedAdapterFactory) adapterFactory).configure(configuration);
- break;
- case GENERIC:
- String outputTypeName = configuration.get("output-type-name");
- adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- feedDataSource.getDatasourceDataverse(), outputTypeName).getDatatype();
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
- break;
- default:
- throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to create adapter " + e);
- }
-
- ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
- .getSerializerDeserializer(adapterOutputType);
- RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
- BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-
- feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
- feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
- feedDesc, feedPolicy.getProperties());
-
+ FeedIntakeOperatorDescriptor feedIngestor = null;
+ org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
AlgebricksPartitionConstraint constraint = null;
+
try {
- constraint = adapterFactory.getPartitionConstraint();
+ factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
+ IAdapterFactory adapterFactory = factoryOutput.getLeft();
+ ARecordType adapterOutputType = factoryOutput.getRight();
+
+ ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+ .getSerializerDeserializer(adapterOutputType);
+ RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+ BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+
+ feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+ feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+ .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
+ feedDesc, feedPolicy.getProperties());
+
+ constraint = factoryOutput.getLeft().getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1338,11 +1304,13 @@
}
/**
- * Calculate an estimate size of the bloom filter. Note that this is an estimation which assumes that the data
- * is going to be uniformly distributed across all partitions.
+ * Calculate an estimate size of the bloom filter. Note that this is an
+ * estimation which assumes that the data is going to be uniformly
+ * distributed across all partitions.
*
* @param dataset
- * @return Number of elements that will be used to create a bloom filter per dataset per partition
+ * @return Number of elements that will be used to create a bloom filter per
+ * dataset per partition
* @throws MetadataException
* @throws AlgebricksException
*/
@@ -1568,13 +1536,14 @@
*
* @param properties
* the original dataset properties
- * @return a new map containing the original dataset properties and the scheduler/locations
+ * @return a new map containing the original dataset properties and the
+ * scheduler/locations
*/
private Map<String, Object> wrapProperties(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
- //wrappedProperties.put(SCHEDULER, hdfsScheduler);
- //wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
+ // wrappedProperties.put(SCHEDULER, hdfsScheduler);
+ // wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
return wrappedProperties;
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
similarity index 77%
rename from asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index c01664a..2ac657f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.file;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.ArrayList;
import java.util.HashMap;
@@ -11,15 +11,18 @@
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -188,5 +191,56 @@
return altered;
}
+
+
+ public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+
+ String adapterName = null;
+ DatasourceAdapter adapterEntity = null;
+ String adapterFactoryClassname = null;
+ IAdapterFactory adapterFactory = null;
+ ARecordType adapterOutputType = null;
+ Pair<IAdapterFactory, ARecordType> feedProps = null;
+ try {
+ adapterName = feed.getAdaptorName();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname != null) {
+ } else {
+ adapterFactoryClassname = adapterName;
+ }
+ adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
+
+ Map<String, String> configuration = feed.getAdaptorConfiguration();
+
+ switch (adapterFactory.getAdapterType()) {
+ case TYPED:
+ adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+ ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+ break;
+ case GENERIC:
+ String outputTypeName = configuration.get("output-type-name");
+ adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+ feed.getDataverseName(), outputTypeName).getDatatype();
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
+ break;
+ default:
+ throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+ }
+
+ feedProps = Pair.of(adapterFactory, adapterOutputType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to create adapter " + e);
+ }
+ return feedProps;
+ }
}