[ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Include 'database' in DataSourceId, DataSourceIndex, and EntityId.
- Change 'Feed' tuple translator to handle 'database' value and
pass it to EntityId.
Change-Id: Icf67d0950810fe0706e16e5bf27f2f9ceda703c6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17813
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
index 1d33961..fa82d37 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
@@ -28,18 +28,24 @@
*/
public class EntityId implements Serializable {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
private final String extensionName;
+ private final String databaseName;
private final DataverseName dataverseName;
private final String entityName;
- public EntityId(String extentionName, DataverseName dataverseName, String entityName) {
- this.extensionName = extentionName;
+ public EntityId(String extensionName, String databaseName, DataverseName dataverseName, String entityName) {
+ this.extensionName = extensionName;
+ this.databaseName = databaseName;
this.dataverseName = dataverseName;
this.entityName = entityName;
}
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
public DataverseName getDataverseName() {
return dataverseName;
}
@@ -57,13 +63,13 @@
return true;
}
EntityId other = (EntityId) o;
- return Objects.equals(other.dataverseName, dataverseName) && Objects.equals(other.entityName, entityName)
- && Objects.equals(other.extensionName, extensionName);
+ return Objects.equals(other.databaseName, databaseName) && Objects.equals(other.dataverseName, dataverseName)
+ && Objects.equals(other.entityName, entityName) && Objects.equals(other.extensionName, extensionName);
}
@Override
public int hashCode() {
- return Objects.hash(dataverseName, entityName, extensionName);
+ return Objects.hash(databaseName, dataverseName, entityName, extensionName);
}
@Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 25c97cc..58f85ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -366,7 +366,7 @@
context.getOutputTypeEnvironment(currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
}
- DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp);
+ DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
// Introduce the TokenizeOperator only when doing bulk-load,
// and index type is keyword or n-gram.
@@ -623,7 +623,7 @@
beforeOpFilterExpression = createAnyUnknownFilterExpression(originalKeyVarList,
context.getOutputTypeEnvironment(originalAssignCoordinates), forceFilter);
}
- DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp);
+ DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 90c5fad..0949c44 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -260,9 +260,9 @@
AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
jobGenParams.readFromFuncArgs(f.getArguments());
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
- DataSourceId dataSourceId =
- new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
String database = MetadataUtil.resolveDatabase(null, jobGenParams.getDataverseName());
+ DataSourceId dataSourceId =
+ new DataSourceId(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
Dataset dataset = mp.findDataset(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
IDataSourceIndex<String, DataSourceId> dsi =
mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index defb383..67c8423 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -1001,7 +1001,7 @@
/**
* In case of a left outer join we look for a special GroupBy above the join operator
- * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext)}.
+ * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext, boolean)}.
* A "Special GroupBy" is a GroupBy that eliminates unjoined duplicates that might be produced by the secondary
* index probe. We probe secondary indexes on each index partition and return a tuple with a right branch variable
* set to MISSING (or NULL) if there's no match on that partition. Therefore if there's more than one partition
@@ -2049,7 +2049,8 @@
unnestOp.setExecutionMode(ExecutionMode.PARTITIONED);
//set the physical operator
- DataSourceId dataSourceId = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName());
+ DataSourceId dataSourceId =
+ new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
unnestOp.setPhysicalOperator(new ExternalDataLookupPOperator(dataSourceId, dataset, recordType, primaryKeyVars,
false, retainInput, retainNull));
return unnestOp;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
index a751aaf..a6f9005 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
@@ -231,7 +231,9 @@
return null;
}
- DataSourceId dsid = new DataSourceId(DataverseName.createFromCanonicalForm(dataverse), dataset);
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverse);
+ String database = MetadataUtil.resolveDatabase(null, dataverseName);
+ DataSourceId dsid = new DataSourceId(database, dataverseName, dataset);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
return metadataProvider.findDataSource(dsid);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 09d5792..88be2d8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -734,7 +734,7 @@
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"Cannot write output to an external " + dataset());
}
- DataSourceId sourceId = new DataSourceId(dataverseName, datasetName);
+ DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName);
String itemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getItemTypeDataverseName());
String metaItemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getMetaItemTypeDataverseName());
IAType itemType = metadataProvider.findType(itemTypeDatabase, dataset.getItemTypeDataverseName(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
index d6abf55..bbdde88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -99,7 +99,8 @@
}
variables.add(unnest.getVariable());
- DataSourceId dsid = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName());
+ DataSourceId dsid =
+ new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
DataSource dataSource = metadataProvider.findDataSource(dsid);
boolean hasMeta = dataSource.hasMeta();
if (hasMeta) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index 035765f..66d8190 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -85,9 +85,9 @@
String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
- DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
- String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
String database = MetadataUtil.resolveDatabase(null, dataverseName);
+ DataSourceId asid = new DataSourceId(database, dataverseName, getTargetFeed);
+ String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(database, dataverseName, policyName);
if (policy == null) {
policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index 5178c3e..b3889fd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -135,7 +135,7 @@
}
private static DataSourceId createQueryIndexDataSourceId(Dataset dataset, String indexName) {
- return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(),
+ return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(),
new String[] { indexName, QueryIndexRewriter.QUERY_INDEX.getName() });
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 574448d..17988cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3872,7 +3872,7 @@
Map<String, String> configuration = cfs.getConfiguration();
ExternalDataUtils.normalize(configuration);
ExternalDataUtils.validate(configuration);
- feed = new Feed(dataverseName, feedName, configuration);
+ feed = new Feed(database, dataverseName, feedName, configuration);
FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx, warningCollector);
MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -4060,7 +4060,7 @@
try {
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// Runtime handler
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName);
// Feed & Feed Connections
Feed feed = FeedMetadataUtil.validateIfFeedExists(database, dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
@@ -4106,8 +4106,9 @@
StopFeedStatement sfst = (StopFeedStatement) stmt;
SourceLocation sourceLoc = sfst.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
+ String database = MetadataUtil.resolveDatabase(null, dataverseName);
String feedName = sfst.getFeedName().getValue();
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName);
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
// Obtain runtime info from ActiveListener
@@ -4208,7 +4209,7 @@
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
// Check whether feed is alive
ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler
- .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName));
+ .getListener(new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName));
if (listener != null && listener.isActive()) {
throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, sourceLoc,
feedName);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 93da8c5..baecf79 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -86,7 +86,7 @@
static DataverseName dataverseName = MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME;
static String database = MetadataUtil.databaseFor(dataverseName);
static String entityName = "entityName";
- static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName);
+ static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName);
static Dataset firstDataset;
static Dataset secondDataset;
static List<Dataset> allDatasets;
@@ -1523,7 +1523,7 @@
TestEventsListener[] additionalListeners = new TestEventsListener[3];
for (int i = 0; i < additionalListeners.length; i++) {
String entityName = "entityName" + i;
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName);
ClusterControllerService ccService = Mockito.mock(ClusterControllerService.class);
CCServiceContext ccServiceCtx = Mockito.mock(CCServiceContext.class);
CcApplicationContext ccAppCtx = Mockito.mock(CcApplicationContext.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index cb123bf..e17a7fe 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -42,6 +42,7 @@
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -74,8 +75,9 @@
@Test
public void refreshStatsTest() throws Exception {
// Entities to be used
- EntityId entityId =
- new EntityId("MockExtension", DataverseName.createSinglePartName("MockDataverse"), "MockEntity");
+ DataverseName mockDataverse = DataverseName.createSinglePartName("MockDataverse");
+ String mockDatabase = MetadataUtil.resolveDatabase(null, mockDataverse);
+ EntityId entityId = new EntityId("MockExtension", mockDatabase, mockDataverse, "MockEntity");
ActiveRuntimeId activeRuntimeId =
new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0);
List<Dataset> datasetList = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
index dd1d1b7..de3fa57 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
@@ -28,6 +28,8 @@
public String getFeedName();
+ public String getDatabaseName();
+
public DataverseName getDataverseName();
public EntityId getFeedId();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 73edc6e..a2f3b10 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -42,8 +42,8 @@
this.hash = toString().hashCode();
}
- public FeedConnectionId(DataverseName dataverseName, String feedName, String datasetName) {
- this(new EntityId(FEED_EXTENSION_NAME, dataverseName, feedName), datasetName);
+ public FeedConnectionId(String databaseName, DataverseName dataverseName, String feedName, String datasetName) {
+ this(new EntityId(FEED_EXTENSION_NAME, databaseName, dataverseName, feedName), datasetName);
}
public EntityId getFeedId() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 0647763..dbc5714 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -85,7 +85,8 @@
public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory,
ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDatabaseName(), primaryFeed.getDataverseName(),
+ primaryFeed.getFeedName());
this.adaptorFactory = adapterFactory;
this.adapterOutputType = adapterOutputType;
this.policyAccessor = policyAccessor;
@@ -96,7 +97,8 @@
String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType,
FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new EntityId(FEED_EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
+ this.feedId =
+ new EntityId(FEED_EXTENSION_NAME, feed.getDatabaseName(), feed.getDataverseName(), feed.getFeedName());
this.adaptorFactoryClassName = adapterFactoryClassName;
this.adaptorLibraryDataverse = adapterLibraryDataverse;
this.adaptorLibraryName = adapterLibraryName;
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 30d3266..a72c1bf 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -72,8 +73,9 @@
private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException, AsterixException {
DataverseName dvName = DataverseName.createSinglePartName(DATAVERSE_NAME);
+ String dbName = MetadataUtil.databaseFor(dvName);
FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
- EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dvName, FEED);
+ EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dbName, dvName, FEED);
FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
index c27ac42..b60682c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
@@ -26,6 +26,8 @@
public final class DataSourceId {
+ private final String databaseName;
+
private final DataverseName dataverseName;
private final String datasourceName;
@@ -35,13 +37,15 @@
/**
* The original constructor taking
*
+ * @param databaseName
+ * the database name
* @param dataverseName
- * the dataverse (namespace) for this datasource
+ * the dataverse (namespace) for this datasource
* @param datasourceName
- * the name for this datasource
+ * the name for this datasource
*/
- public DataSourceId(DataverseName dataverseName, String datasourceName) {
- this(dataverseName, datasourceName, null);
+ public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName) {
+ this(databaseName, dataverseName, datasourceName, null);
}
/**
@@ -50,7 +54,8 @@
* that would expose different behavior. It enables the definition of (compile-time) parameterized datasources.
* Please note that the first 2 parameters still need to be 1) a dataverse name and 2) a datasource name.
*/
- public DataSourceId(DataverseName dataverseName, String datasourceName, String[] parameters) {
+ public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName, String[] parameters) {
+ this.databaseName = databaseName;
this.dataverseName = dataverseName;
this.datasourceName = datasourceName;
this.parameters = parameters;
@@ -61,6 +66,10 @@
return dataverseName + "." + datasourceName + (parameters != null ? "." + String.join(".", parameters) : "");
}
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
public DataverseName getDataverseName() {
return dataverseName;
}
@@ -78,13 +87,13 @@
return false;
}
DataSourceId that = (DataSourceId) o;
- return dataverseName.equals(that.dataverseName) && datasourceName.equals(that.datasourceName)
- && Arrays.equals(parameters, that.parameters);
+ return Objects.equals(databaseName, that.databaseName) && dataverseName.equals(that.dataverseName)
+ && datasourceName.equals(that.datasourceName) && Arrays.equals(parameters, that.parameters);
}
@Override
public int hashCode() {
- int result = Objects.hash(dataverseName, datasourceName);
+ int result = Objects.hash(databaseName, dataverseName, datasourceName);
result = 31 * result + Arrays.hashCode(parameters);
return result;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
index 05498b9..80ca036 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
@@ -27,15 +27,17 @@
public class DataSourceIndex implements IDataSourceIndex<String, DataSourceId> {
private final Index index;
+ private final String database;
private final DataverseName dataverseName;
private final String datasetName;
private final MetadataProvider metadataProvider;
// Every transactions needs to work with its own instance of an
// MetadataProvider.
- public DataSourceIndex(Index index, DataverseName dataverseName, String datasetName,
+ public DataSourceIndex(Index index, String database, DataverseName dataverseName, String datasetName,
MetadataProvider metadataProvider) {
this.index = index;
+ this.database = database;
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.metadataProvider = metadataProvider;
@@ -45,7 +47,7 @@
@Override
public IDataSource<DataSourceId> getDataSource() {
try {
- DataSourceId sourceId = new DataSourceId(dataverseName, datasetName);
+ DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName);
return metadataProvider.lookupSourceInMetadata(sourceId);
} catch (Exception me) {
return null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index d975404..628c745 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -191,8 +191,8 @@
throw new AlgebricksException("Feed not configured with a policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- FeedConnectionId feedConnectionId =
- new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset());
+ FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDatabaseName(),
+ getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset());
FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 690338c..683bf0f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
@@ -135,6 +136,7 @@
}
protected static DataSourceId createDataSourceId(FunctionIdentifier fid, String... parameters) {
- return new DataSourceId(FunctionSignature.getDataverseName(fid), fid.getName(), parameters);
+ return new DataSourceId(MetadataUtil.resolveDatabase(null, FunctionSignature.getDataverseName(fid)),
+ FunctionSignature.getDataverseName(fid), fid.getName(), parameters);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 5fa2af4..20505bb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.metadata.entities.Dataset;
@@ -64,8 +65,9 @@
public LoadableDataSource(Dataset targetDataset, IAType itemType, IAType metaItemType, String adapter,
Map<String, String> properties) throws AlgebricksException, IOException {
- super(new DataSourceId(DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType,
- Type.LOADABLE, null);
+ super(new DataSourceId(MetadataUtil.databaseFor(DataverseName.createSinglePartName(LOADABLE_DV)),
+ DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType, Type.LOADABLE,
+ null);
this.targetDataset = targetDataset;
this.adapter = adapter;
this.adapterProperties = properties;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a900366..3a9f6c1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -434,7 +434,7 @@
String database = dataset.getDatabaseName();
String datasetName = dataset.getDatasetName();
Index index = getIndex(database, dataverseName, datasetName, indexId);
- return index != null ? new DataSourceIndex(index, dataverseName, datasetName, this) : null;
+ return index != null ? new DataSourceIndex(index, database, dataverseName, datasetName, this) : null;
}
public Index getIndex(String database, DataverseName dataverseName, String datasetName, String indexName)
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 3b2937b..ff6cffa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -76,6 +76,7 @@
}
private static DataSourceId createSampleDataSourceId(Dataset dataset, String sampleIndexName) {
- return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), new String[] { sampleIndexName });
+ return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(),
+ new String[] { sampleIndexName });
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
index e78b7d4..5796df9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
@@ -41,8 +41,9 @@
/** Feed configurations */
private final Map<String, String> feedConfiguration;
- public Feed(DataverseName dataverseName, String feedName, Map<String, String> feedConfiguration) {
- this.feedId = new EntityId(EXTENSION_NAME, dataverseName, feedName);
+ public Feed(String databaseName, DataverseName dataverseName, String feedName,
+ Map<String, String> feedConfiguration) {
+ this.feedId = new EntityId(EXTENSION_NAME, databaseName, dataverseName, feedName);
this.displayName = "(" + feedId + ")";
this.feedConfiguration = feedConfiguration;
}
@@ -53,6 +54,11 @@
}
@Override
+ public String getDatabaseName() {
+ return feedId.getDatabaseName();
+ }
+
+ @Override
public DataverseName getDataverseName() {
return feedId.getDataverseName();
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 06ffce4..a0a288f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -58,7 +58,7 @@
this.policyName = policyName;
this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody;
this.outputType = outputType;
- this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName);
+ this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, databaseName, dataverseName, feedName);
}
public List<FunctionSignature> getAppliedFunctions() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index bb1c9ad..0af8298 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -28,6 +28,7 @@
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.metadata.bootstrap.FeedEntity;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.Feed;
@@ -59,6 +60,13 @@
String dataverseCanonicalName =
((AString) feedRecord.getValueByPos(feedEntity.dataverseNameIndex())).getStringValue();
DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
+ int databaseNameIndex = feedEntity.databaseNameIndex();
+ String databaseName;
+ if (databaseNameIndex >= 0) {
+ databaseName = ((AString) feedRecord.getValueByPos(databaseNameIndex)).getStringValue();
+ } else {
+ databaseName = MetadataUtil.databaseFor(dataverseName);
+ }
String feedName = ((AString) feedRecord.getValueByPos(feedEntity.feedNameIndex())).getStringValue();
AUnorderedList feedConfig = (AUnorderedList) feedRecord.getValueByPos(feedEntity.adapterConfigIndex());
@@ -74,7 +82,7 @@
adaptorConfiguration.put(key, value);
}
- return new Feed(dataverseName, feedName, adaptorConfiguration);
+ return new Feed(databaseName, dataverseName, feedName, adaptorConfiguration);
}
@Override
@@ -83,6 +91,12 @@
// write the key in the first two fields of the tuple
tupleBuilder.reset();
+
+ if (feedEntity.databaseNameIndex() >= 0) {
+ aString.setValue(feed.getDatabaseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ }
aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
@@ -93,6 +107,12 @@
recordBuilder.reset(feedEntity.getRecordType());
+ if (feedEntity.databaseNameIndex() >= 0) {
+ fieldValue.reset();
+ aString.setValue(feed.getDatabaseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(feedEntity.databaseNameIndex(), fieldValue);
+ }
// write dataverse name
fieldValue.reset();
aString.setValue(dataverseCanonicalName);