checkpoint
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index d9c9d2f..f4b7f27 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -61,7 +61,6 @@
FEED,
EXTERNAL,
EXTERNAL_FEED,
- FEED_INTERCEPT
}
public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType datasourceType)
@@ -77,7 +76,6 @@
initInternalDataset(itemType);
break;
}
- case FEED_INTERCEPT:
case EXTERNAL_FEED:
case EXTERNAL: {
initExternalDataset(itemType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b3dbf48..b0ddbbf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -333,53 +333,12 @@
case EXTERNAL:
return buildExternalDataScannerRuntime(jobSpec, itemType,
(ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
- case FEED_INTERCEPT:
- return buildFeedInterceptRuntime(jobSpec, dataset, dataSource);
default:
throw new IllegalStateException("Unknown aql datasource type: "
+ ((AqlDataSource) dataSource).getDatasourceType());
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedInterceptRuntime(
- JobSpecification jobSpec, Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
- FeedActivity feedActivity = null;
- try {
- feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), null);
- if (!FeedUtil.isFeedActive(feedActivity)) {
- throw new AlgebricksException("Source feed " + dataset.getDataverseName() + ":"
- + dataset.getDatasetName() + " is not " + FeedState.ACTIVE);
- }
- String dataTypeName = dataset.getItemTypeName();
- Datatype datatype = MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getDataverseName(), dataTypeName);
- ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
- .getSerializerDeserializer(datatype.getDatatype());
-
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- FeedInterceptScanOperatorDescriptor feedInterceptScan = new FeedInterceptScanOperatorDescriptor(jobSpec,
- scannerDesc, new FeedId(dataset.getDataverseName(), dataset.getDatasetName()));
-
- FeedActivity beginFeedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx,
- dataset.getDataverseName(), dataset.getDatasetName(), FeedActivityType.FEED_BEGIN);
- String[] computeLocations = beginFeedActivity.getFeedActivityDetails()
- .get(FeedActivityDetails.COMPUTE_LOCATIONS).split(",");
- AlgebricksPartitionConstraint constraint;
- try {
- constraint = new AlgebricksAbsolutePartitionConstraint(computeLocations);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedInterceptScan, constraint);
-
- } catch (Exception e) {
- throw new AlgebricksException("Unable to create feed intercept scan runtime", e);
- }
- }
-
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, ExternalDatasetDetails datasetDetails, IDataFormat format)
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 2b04363..b3321cb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -31,7 +31,6 @@
}
private Map<FeedId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedId, List<AdapterRuntimeManager>>();
- private Map<FeedId, IFeedDistributor> activeFeedDistributors = new HashMap<FeedId, IFeedDistributor>();
@Override
public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
@@ -68,22 +67,8 @@
return null;
}
- @Override
- public synchronized void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor) {
- activeFeedDistributors.put(feedId, feedDistributor);
- }
-
- @Override
- public synchronized void deRegisterSourceFeed(FeedId feedId) {
- activeFeedDistributors.remove(feedId);
- }
-
public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedId feedId) {
return activeFeedRuntimeManagers.get(feedId);
}
- public IFeedDistributor getFeedDistributor(FeedId feedId) {
- return activeFeedDistributors.get(feedId);
- }
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index cbabd7e..46302e8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -36,15 +36,4 @@
*/
public AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition);
- /**
- * @param feedId
- */
- public void deRegisterSourceFeed(FeedId feedId);
-
- /**
- * @param feedId
- * @param feedDistributor
- */
- void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor);
-
}