checkpoint
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index ab3516a..d7c65c8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -31,6 +31,7 @@
import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceFeedInterceptOperatorRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
@@ -209,6 +210,7 @@
accessMethod.add(new RemoveUnusedOneToOneEquiJoinRule());
accessMethod.add(new PushSimilarityFunctionsBelowJoin());
accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
+ accessMethod.add(new IntroduceFeedInterceptOperatorRule());
return accessMethod;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index cfb767f..2953e42 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -179,6 +179,54 @@
return true;
}
+
+ if (fid.equals(AsterixBuiltinFunctions.FEED_INTERCEPT)) {
+ if (unnest.getPositionalVariable() != null) {
+ throw new AlgebricksException("No positional variables are allowed over datasets.");
+ }
+ ILogicalExpression expr = f.getArguments().get(0).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+ ConstantExpression ce = (ConstantExpression) expr;
+ IAlgebricksConstantValue acv = ce.getValue();
+ if (!(acv instanceof AsterixConstantValue)) {
+ return false;
+ }
+ AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+ if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+ return false;
+ }
+ String datasetArg = ((AString) acv2.getObject()).getStringValue();
+
+ AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+ Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
+ String dataverseName = datasetReference.first;
+ String datasetName = datasetReference.second;
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName);
+ }
+
+ if (dataset.getDatasetType() != DatasetType.FEED) {
+ throw new IllegalArgumentException("invalid dataset type:" + dataset.getDatasetType());
+ }
+
+ AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
+ ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
+ v.add(unnest.getVariable());
+
+ DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedInterceptDataSource(asid,
+ dataset, metadataProvider));
+
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ addPrimaryKey(v, context);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+
+ return true;
+ }
}
return false;
@@ -208,6 +256,20 @@
return extDataSource;
}
+ private AqlDataSource createDummyFeedInterceptDataSource(AqlSourceId aqlId, Dataset dataset,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException {
+ if (!aqlId.getDataverseName().equals(
+ metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
+ .getDataverseName())) {
+ return null;
+ }
+ String tName = dataset.getItemTypeName();
+ IAType itemType = metadataProvider.findType(dataset.getDataverseName(), tName);
+ ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
+ AqlDataSource.AqlDataSourceType.FEED_INTERCEPT);
+ return extDataSource;
+ }
+
private Pair<String, String> parseDatasetReference(AqlMetadataProvider metadataProvider, String datasetArg)
throws AlgebricksException {
String[] datasetNameComponents = datasetArg.split("\\.");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 2292cdf..f1d1dfa 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -94,6 +94,7 @@
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -639,7 +640,7 @@
if (ds.getDatasetType().equals(DatasetType.FEED)) {
FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
null);
- boolean activeFeed = FeedOperations.isFeedActive(fa);
+ boolean activeFeed = FeedUtil.isFeedActive(fa);
if (activeFeed) {
throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ " Operation not supported.");
@@ -923,7 +924,7 @@
if (ds.getDatasetType().equals(DatasetType.FEED)) {
FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
null);
- boolean activeFeed = FeedOperations.isFeedActive(fa);
+ boolean activeFeed = FeedUtil.isFeedActive(fa);
if (activeFeed) {
throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ " Operation not supported.");
@@ -1041,7 +1042,7 @@
if (ds.getDatasetType().equals(DatasetType.FEED)) {
FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
null);
- boolean activeFeed = FeedOperations.isFeedActive(fa);
+ boolean activeFeed = FeedUtil.isFeedActive(fa);
if (activeFeed) {
throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ " Operation not supported.");
@@ -1417,7 +1418,7 @@
FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
.getDatasetName().getValue(), null);
- boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
+ boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
if (isFeedActive && !bfs.isForceBegin()) {
throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
+ " is currently ACTIVE. Operation not supported");
@@ -1480,7 +1481,7 @@
FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName,
datasetName, FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
- boolean isFeedActive = FeedOperations.isFeedActive(feedActivity);
+ boolean isFeedActive = FeedUtil.isFeedActive(feedActivity);
if (!isFeedActive) {
throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
+ " is currently INACTIVE. Operation not supported");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 024e660..503f565 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -71,10 +71,7 @@
}
}
- public static boolean isFeedActive(FeedActivity feedActivity) {
- return (feedActivity != null && (!feedActivity.getActivityType().equals(FeedActivityType.FEED_END) && !feedActivity
- .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
- }
+
private static JobSpecification createSendMessageToFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
AqlMetadataProvider metadataProvider, FeedActivity feedActivity) throws AsterixException {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index e1eb8c4..921e93e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -126,8 +126,6 @@
@Override
public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
- IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, AsterixAppContextInfo
- .getInstance().getCCApplicationContext(), EnumSet.noneOf(JobFlag.class));
JobSpecification spec = acggf.getJobSpecification();
boolean feedIngestionJob = false;
FeedId feedId = null;
@@ -143,6 +141,10 @@
}
if (feedIngestionJob) {
feedJobNotificationHandler.registerFeed(feedId, jobId, spec, feedPolicy);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered feed: " + feedId + " ingestion policy "
+ + feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+ }
}
}
@@ -316,10 +318,8 @@
}
private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
-
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
-
try {
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
@@ -350,7 +350,6 @@
} finally {
MetadataManager.INSTANCE.releaseWriteLatch();
}
-
}
}
@@ -369,6 +368,19 @@
this.feedPolicy = feedPolicy;
}
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof FeedInfo)) {
+ return false;
+ }
+ return ((FeedInfo) o).feedId.equals(feedId);
+ }
+
+ @Override
+ public int hashCode() {
+ return feedId.hashCode();
+ }
+
}
@Override
@@ -540,6 +552,7 @@
FeedActivity fa = null;
Map<String, String> feedActivityDetails = new HashMap<String, String>();
StringBuilder builder = new StringBuilder();
+ MetadataManager.INSTANCE.acquireWriteLatch();
try {
ctx = MetadataManager.INSTANCE.beginTransaction();
for (Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
@@ -564,6 +577,8 @@
throw new IllegalStateException("Unable to abort transaction " + e2);
}
}
+ } finally {
+ MetadataManager.INSTANCE.releaseWriteLatch();
}
}
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index b1bf69d..801082b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -64,7 +64,7 @@
// TODO: Uncomment when hadoop version is upgraded and adapters are
// ported.
- HDFSCluster.getInstance().setup();
+ // HDFSCluster.getInstance().setup();
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
diff --git a/asterix-app/src/test/resources/hadoop/conf/core-site.xml b/asterix-app/src/test/resources/hadoop/conf/core-site.xml
index 5b1023c..433325a 100644
--- a/asterix-app/src/test/resources/hadoop/conf/core-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/core-site.xml
@@ -21,7 +21,7 @@
<property>
<name>fs.default.name</name>
- <value>hdfs://127.0.0.1:31888</value>
+ <value>hdfs://192.168.0.103:31888</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index b9ed170..68d03bf 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -307,8 +307,9 @@
JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
Marshaller marshaller = ctx.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
- marshaller.marshal(configuration, new FileOutputStream(asterixConfDir + File.separator
- + ASTERIX_CONFIGURATION_FILE));
+ FileOutputStream os = new FileOutputStream(asterixConfDir + File.separator + ASTERIX_CONFIGURATION_FILE);
+ marshaller.marshal(configuration, os);
+ os.close();
}
private static void writeAsterixLogConfigurationFile(AsterixInstance asterixInstance, Properties logProperties)
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
index 7063c76..5c059bb 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
@@ -109,7 +109,7 @@
msg.append("1) Managix is incorrectly configured. Please run " + "managix validate"
+ " to run a validation test and correct the errors reported.");
msg.append("\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
- + null + File.separator + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
+ + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
throw new Exception(msg.toString());
}
msgQ.take();
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
index 50e13ac..041839d 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
@@ -23,7 +23,7 @@
@Override
protected void execCommand() throws Exception {
- InstallerDriver.initConfig(true);
+ InstallerDriver.initConfig(false);
String asterixZipName = AsterixEventService.getAsterixZip().substring(
AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
String asterixVersion = asterixZipName.substring("asterix-server-".length(),
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index e3e4cb7..d9c9d2f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -60,7 +60,8 @@
INTERNAL,
FEED,
EXTERNAL,
- EXTERNAL_FEED
+ EXTERNAL_FEED,
+ FEED_INTERCEPT
}
public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType datasourceType)
@@ -76,6 +77,7 @@
initInternalDataset(itemType);
break;
}
+ case FEED_INTERCEPT:
case EXTERNAL_FEED:
case EXTERNAL: {
initExternalDataset(itemType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4eeb072..b3dbf48 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -52,7 +52,9 @@
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -60,7 +62,9 @@
import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedInterceptScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
@@ -323,11 +327,56 @@
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
.getDatatype();
- if (dataSource instanceof ExternalFeedDataSource) {
- return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
- } else {
- return buildExternalDataScannerRuntime(jobSpec, itemType,
- (ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+ switch (((AqlDataSource) dataSource).getDatasourceType()) {
+ case EXTERNAL_FEED:
+ return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
+ case EXTERNAL:
+ return buildExternalDataScannerRuntime(jobSpec, itemType,
+ (ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+ case FEED_INTERCEPT:
+ return buildFeedInterceptRuntime(jobSpec, dataset, dataSource);
+ default:
+ throw new IllegalStateException("Unknown aql datasource type: "
+ + ((AqlDataSource) dataSource).getDatasourceType());
+ }
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedInterceptRuntime(
+ JobSpecification jobSpec, Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+ FeedActivity feedActivity = null;
+ try {
+ feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), null);
+ if (!FeedUtil.isFeedActive(feedActivity)) {
+ throw new AlgebricksException("Source feed " + dataset.getDataverseName() + ":"
+ + dataset.getDatasetName() + " is not " + FeedState.ACTIVE);
+ }
+ String dataTypeName = dataset.getItemTypeName();
+ Datatype datatype = MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getDataverseName(), dataTypeName);
+ ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+ .getSerializerDeserializer(datatype.getDatatype());
+
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ FeedInterceptScanOperatorDescriptor feedInterceptScan = new FeedInterceptScanOperatorDescriptor(jobSpec,
+ scannerDesc, new FeedId(dataset.getDataverseName(), dataset.getDatasetName()));
+
+ FeedActivity beginFeedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx,
+ dataset.getDataverseName(), dataset.getDatasetName(), FeedActivityType.FEED_BEGIN);
+ String[] computeLocations = beginFeedActivity.getFeedActivityDetails()
+ .get(FeedActivityDetails.COMPUTE_LOCATIONS).split(",");
+ AlgebricksPartitionConstraint constraint;
+ try {
+ constraint = new AlgebricksAbsolutePartitionConstraint(computeLocations);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedInterceptScan, constraint);
+
+ } catch (Exception e) {
+ throw new AlgebricksException("Unable to create feed intercept scan runtime", e);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index b3321cb..2b04363 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -31,6 +31,7 @@
}
private Map<FeedId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedId, List<AdapterRuntimeManager>>();
+ private Map<FeedId, IFeedDistributor> activeFeedDistributors = new HashMap<FeedId, IFeedDistributor>();
@Override
public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
@@ -67,8 +68,22 @@
return null;
}
+ @Override
+ public synchronized void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor) {
+ activeFeedDistributors.put(feedId, feedDistributor);
+ }
+
+ @Override
+ public synchronized void deRegisterSourceFeed(FeedId feedId) {
+ activeFeedDistributors.remove(feedId);
+ }
+
public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedId feedId) {
return activeFeedRuntimeManagers.get(feedId);
}
+ public IFeedDistributor getFeedDistributor(FeedId feedId) {
+ return activeFeedDistributors.get(feedId);
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 46302e8..cbabd7e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -36,4 +36,15 @@
*/
public AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition);
+ /**
+ * @param feedId
+ */
+ public void deRegisterSourceFeed(FeedId feedId);
+
+ /**
+ * @param feedId
+ * @param feedDistributor
+ */
+ void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor);
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
index d5d4cc2..639b77a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -40,6 +40,8 @@
AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.DATASET);
AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INGEST, false);
AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INGEST);
+ AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INTERCEPT, false);
+ AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INTERCEPT);
}
public static void addMetadataBuiltinFunctions() {
@@ -129,6 +131,49 @@
return t2;
}
});
+
+ AsterixBuiltinFunctions.addFunction(AsterixBuiltinFunctions.FEED_INTERCEPT, new IResultTypeComputer() {
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+ IMetadataProvider<?, ?> mp) throws AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+ if (f.getArguments().size() != 1) {
+ throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
+ }
+ ILogicalExpression a1 = f.getArguments().get(0).getValue();
+ IAType t1 = (IAType) env.getType(a1);
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return BuiltinType.ANY;
+ }
+ if (t1.getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
+ }
+ if (a1.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return BuiltinType.ANY;
+ }
+ AsterixConstantValue acv = (AsterixConstantValue) ((ConstantExpression) a1).getValue();
+ String datasetArg = ((AString) acv.getObject()).getStringValue();
+ AqlMetadataProvider metadata = ((AqlMetadataProvider) mp);
+ Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
+ String dataverseName = datasetInfo.first;
+ String datasetName = datasetInfo.second;
+ if (dataverseName == null) {
+ throw new AlgebricksException("Unspecified dataverse!");
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse "
+ + dataverseName);
+ }
+ String tn = dataset.getItemTypeName();
+ IAType t2 = metadata.findType(dataverseName, tn);
+ if (t2 == null) {
+ throw new AlgebricksException("No type for dataset " + datasetName);
+ }
+ return t2;
+ }
+ });
}
private static Pair<String, String> getDatasetInfo(AqlMetadataProvider metadata, String datasetArg) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 1a2a0db..e3be0df 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -194,8 +194,7 @@
"numeric-idiv", 2);
public final static FunctionIdentifier CARET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "caret", 2);
- public final static FunctionIdentifier NUMERIC_ABS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
- "abs", 1);
+ public final static FunctionIdentifier NUMERIC_ABS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "abs", 1);
public final static FunctionIdentifier NUMERIC_CEILING = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"ceiling", 1);
public final static FunctionIdentifier NUMERIC_FLOOR = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -243,6 +242,8 @@
public final static FunctionIdentifier DATASET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset", 1);
public final static FunctionIdentifier FEED_INGEST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"feed-ingest", 1);
+ public final static FunctionIdentifier FEED_INTERCEPT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "feed-intercept", 1);
public final static FunctionIdentifier INDEX_SEARCH = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"index-search", FunctionIdentifier.VARARGS);
@@ -424,7 +425,8 @@
"interval-overlaps", 2);
public final static FunctionIdentifier INTERVAL_OVERLAPPED_BY = new FunctionIdentifier(
FunctionConstants.ASTERIX_NS, "interval-overlapped-by", 2);
- public final static FunctionIdentifier OVERLAP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "interval-overlapping", 2);
+ public final static FunctionIdentifier OVERLAP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "interval-overlapping", 2);
public final static FunctionIdentifier INTERVAL_STARTS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"interval-starts", 2);
public final static FunctionIdentifier INTERVAL_STARTED_BY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -958,12 +960,14 @@
static {
datasetFunctions.add(getAsterixFunctionInfo(DATASET));
datasetFunctions.add(getAsterixFunctionInfo(FEED_INGEST));
+ datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
}
static {
addUnnestFun(DATASET, false);
addUnnestFun(FEED_INGEST, false);
+ addUnnestFun(FEED_INTERCEPT, false);
addUnnestFun(RANGE, true);
addUnnestFun(SCAN_COLLECTION, false);
addUnnestFun(SUBSET_COLLECTION, false);