Fixed Feed Connect Statement
This change includes two fixes:
1. Feed connect doesn't return until the connection is complete.
2. When using wait for completion, it waits until all the jobs
complete.
Change-Id: I416bf4917b1f5cea687d1202c435f7183136cf1f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/726
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index d729680..04f20fb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -32,15 +32,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.asterix.app.external.FeedLifecycleListener.Message;
+import org.apache.asterix.app.external.FeedLifecycleListener.FeedEvent;
import org.apache.asterix.app.external.FeedWorkCollection.SubscribeFeedWork;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.api.FeedOperationCounter;
import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
import org.apache.asterix.external.feed.api.IFeedJoint.State;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.external.feed.management.FeedId;
@@ -59,6 +59,7 @@
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -78,37 +79,43 @@
private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
- private final LinkedBlockingQueue<Message> inbox;
+ private final LinkedBlockingQueue<FeedEvent> inbox;
private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
private final Map<JobId, FeedJobInfo> jobInfos;
private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
- private final Map<FeedId, List<IFeedJoint>> feedPipeline;
+ private final Map<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
- public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
+ public FeedJobNotificationHandler(LinkedBlockingQueue<FeedEvent> inbox) {
this.inbox = inbox;
this.jobInfos = new HashMap<JobId, FeedJobInfo>();
this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
- this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
+ this.feedPipeline = new HashMap<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>>();
this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
}
@Override
public void run() {
- Message mesg;
+ FeedEvent event;
while (true) {
try {
- mesg = inbox.take();
- switch (mesg.messageKind) {
+ event = inbox.take();
+ switch (event.eventKind) {
case JOB_START:
- handleJobStartMessage(mesg);
+ handleJobStartEvent(event);
break;
case JOB_FINISH:
- handleJobFinishMessage(mesg);
+ handleJobFinishEvent(event);
+ break;
+ case PROVIDER_READY:
+ handleProviderReady(event);
+ break;
+ default:
+ LOGGER.log(Level.WARNING, "Unknown Feed Event");
break;
}
} catch (Exception e) {
@@ -121,11 +128,11 @@
public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
IIntakeProgressTracker feedIntakeProgressTracker) {
if (feedIntakeProgressTrackers.get(connectionId) == null) {
- this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
- feedIntakeProgressTracker, 0L));
+ this.feedIntakeProgressTrackers.put(connectionId,
+ new Pair<IIntakeProgressTracker, Long>(feedIntakeProgressTracker, 0L));
} else {
- throw new IllegalStateException(" Progress tracker for connection " + connectionId
- + " is alreader registered");
+ throw new IllegalStateException(
+ " Progress tracker for connection " + connectionId + " is alreader registered");
}
}
@@ -149,29 +156,35 @@
return connectJobInfos.values();
}
- public void registerFeedJoint(IFeedJoint feedJoint) {
- List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
+ public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+ Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline = feedPipeline
+ .get(feedJoint.getOwnerFeedId());
+
if (feedJointsOnPipeline == null) {
- feedJointsOnPipeline = new ArrayList<IFeedJoint>();
+ feedJointsOnPipeline = new Pair<FeedOperationCounter, List<IFeedJoint>>(
+ new FeedOperationCounter(numOfPrividers, 1), new ArrayList<IFeedJoint>());
feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
- feedJointsOnPipeline.add(feedJoint);
+ feedJointsOnPipeline.second.add(feedJoint);
} else {
- if (!feedJointsOnPipeline.contains(feedJoint)) {
- feedJointsOnPipeline.add(feedJoint);
+ if (!feedJointsOnPipeline.second.contains(feedJoint)) {
+ feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount() + 1);
+ feedJointsOnPipeline.second.add(feedJoint);
} else {
throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
}
}
}
- public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
+ public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
+ throws HyracksDataException {
if (jobInfos.get(jobId) != null) {
throw new IllegalStateException("Feed job already registered");
}
- List<IFeedJoint> joints = feedPipeline.get(feedId);
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.containsKey(feedId) ? feedPipeline.get(feedId)
+ : null;
IFeedJoint intakeJoint = null;
- for (IFeedJoint joint : joints) {
+ for (IFeedJoint joint : pair.second) {
if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
intakeJoint = joint;
break;
@@ -181,6 +194,7 @@
if (intakeJoint != null) {
FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
feedId, intakeJoint, jobSpec);
+ pair.first.setFeedJobInfo(intakeJobInfo);
intakeJobInfos.put(feedId, intakeJobInfo);
jobInfos.put(jobId, intakeJobInfo);
@@ -188,8 +202,8 @@
LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
}
} else {
- throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed "
- + feedId);
+ throw new HyracksDataException(
+ "Could not register feed intake job [" + jobId + "]" + " for feed " + feedId);
}
}
@@ -199,7 +213,7 @@
throw new IllegalStateException("Feed job already registered");
}
- List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
+ List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second;
FeedConnectionId cid = null;
IFeedJoint sourceFeedJoint = null;
for (IFeedJoint joint : feedJoints) {
@@ -238,7 +252,7 @@
intakeJobInfos.remove(info.getFeedId());
if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
- List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
+ List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second;
joints.remove(info.getIntakeFeedJoint());
if (LOGGER.isLoggable(Level.INFO)) {
@@ -252,7 +266,7 @@
}
- private void handleJobStartMessage(Message message) throws Exception {
+ private void handleJobStartEvent(FeedEvent message) throws Exception {
FeedJobInfo jobInfo = jobInfos.get(message.jobId);
switch (jobInfo.getJobType()) {
case INTAKE:
@@ -265,7 +279,7 @@
}
- private void handleJobFinishMessage(Message message) throws Exception {
+ private void handleJobFinishEvent(FeedEvent message) throws Exception {
FeedJobInfo jobInfo = jobInfos.get(message.jobId);
switch (jobInfo.getJobType()) {
case INTAKE:
@@ -276,12 +290,22 @@
break;
case FEED_CONNECT:
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Collect Job finished for " + (FeedConnectJobInfo) jobInfo);
+ LOGGER.info("Collect Job finished for " + jobInfo);
}
handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
break;
}
+ }
+ private void handleProviderReady(FeedEvent message) {
+ FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId);
+ Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
+ feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);;
+ if (feedCounter.first.getProvidersCount() == 0) {
+ jobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
+ jobInfo.setState(FeedJobState.ACTIVE);
+ notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+ }
}
private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
@@ -306,19 +330,16 @@
}
// intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
intakeJobInfo.setIntakeLocation(intakeLocations);
- intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
- intakeJobInfo.setState(FeedJobState.ACTIVE);
-
- // notify event listeners
- notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
}
private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
// set locations of feed sub-operations (intake, compute, store)
setLocations(cInfo);
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+ pair.first.setJobsCount(pair.first.getJobsCount() + 1);
// activate joints
- List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+ List<IFeedJoint> joints = pair.second;
for (IFeedJoint joint : joints) {
if (joint.getProvider().equals(cInfo.getConnectionId())) {
joint.setState(State.ACTIVE);
@@ -328,7 +349,6 @@
}
}
cInfo.setState(FeedJobState.ACTIVE);
-
// register activity in metadata
registerFeedActivity(cInfo);
// notify event listeners
@@ -413,20 +433,25 @@
return connectJobInfos.get(connectionId).getState();
}
- private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
+ private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message) throws Exception {
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
JobStatus status = info.getStatus();
- FeedLifecycleEvent event;
- event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
- : FeedLifecycleEvent.FEED_ENDED;
-
+ FeedId feedId = intakeInfo.getFeedId();
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+ pair.first.setJobsCount(pair.first.getJobsCount() - 1);
+ if (status.equals(JobStatus.FAILURE)) {
+ pair.first.setFailedIngestion(true);
+ }
// remove feed joints
deregisterFeedIntakeJob(message.jobId);
// notify event listeners
- notifyFeedEventSubscribers(intakeInfo, event);
-
+ if (pair.first.getJobsCount() == 0) {
+ feedPipeline.remove(feedId);
+ notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion()
+ ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
+ }
}
private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
@@ -446,7 +471,8 @@
IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
feedJoint.removeReceiver(connectionId);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+ LOGGER.info(
+ "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
}
removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
}
@@ -458,6 +484,15 @@
}
deregisterFeedActivity(cInfo);
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline
+ .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
+ pair.first.setJobsCount(pair.first.getJobsCount() - 1);
+ if (pair.first.getJobsCount() == 0) {
+ notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), pair.first.isFailedIngestion()
+ ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
+ feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
+ }
+
// notify event listeners
FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
notifyFeedEventSubscribers(cInfo, event);
@@ -486,11 +521,11 @@
feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
try {
- FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
- .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
+ FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(),
+ cInfo.getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
feedActivityDetails);
- CentralFeedManager.getInstance().getFeedLoadManager()
- .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
+ CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(cInfo.getConnectionId(),
+ feedActivity);
} catch (Exception e) {
e.printStackTrace();
@@ -514,7 +549,7 @@
public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
- List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
+ List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()).second;
IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
List<FeedConnectionId> all = sourceJoint.getReceivers();
@@ -534,7 +569,7 @@
}
public List<String> getFeedComputeLocations(FeedId feedId) {
- List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
+ List<IFeedJoint> feedJoints = feedPipeline.get(feedId).second;
for (IFeedJoint joint : feedJoints) {
if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
return connectJobInfos.get(joint.getProvider()).getComputeLocations();
@@ -578,7 +613,8 @@
//============================
public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
- List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
+ List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
+ ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
if (joints != null && !joints.isEmpty()) {
for (IFeedJoint joint : joints) {
if (joint.getFeedJointKey().equals(feedJointKey)) {
@@ -598,7 +634,8 @@
}
public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
- List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
+ List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId())
+ ? feedPipeline.get(feedPointKey.getFeedId()).second : null;
if (joints != null && !joints.isEmpty()) {
for (IFeedJoint joint : joints) {
if (joint.getFeedJointKey().equals(feedPointKey)) {
@@ -615,7 +652,8 @@
return feedJoint;
} else {
String jointKeyString = feedJointKey.getStringRep();
- List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
+ List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId())
+ ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
IFeedJoint candidateJoint = null;
if (jointsOnPipeline != null) {
for (IFeedJoint joint : jointsOnPipeline) {
@@ -638,7 +676,7 @@
}
public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
- List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
+ List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second;
for (IFeedJoint joint : joints) {
if (joint.getType().equals(type)) {
return joint;
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index 8e44af4..d7129b8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -88,7 +88,7 @@
public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
- private final LinkedBlockingQueue<Message> jobEventInbox;
+ private final LinkedBlockingQueue<FeedEvent> jobEventInbox;
private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
@@ -99,7 +99,7 @@
private ClusterState state;
private FeedLifecycleListener() {
- this.jobEventInbox = new LinkedBlockingQueue<Message>();
+ this.jobEventInbox = new LinkedBlockingQueue<FeedEvent>();
this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
@@ -114,14 +114,14 @@
@Override
public void notifyJobStart(JobId jobId) throws HyracksException {
if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
- jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
+ jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_START));
}
}
@Override
public void notifyJobFinish(JobId jobId) throws HyracksException {
if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
- jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
+ jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_FINISH));
} else {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
@@ -178,19 +178,26 @@
return feedJobNotificationHandler.getFeedJobState(connectionId);
}
- public static class Message {
+ public static class FeedEvent {
public JobId jobId;
+ public FeedId feedId;
- public enum MessageKind {
+ public enum EventKind {
JOB_START,
- JOB_FINISH
+ JOB_FINISH,
+ PROVIDER_READY
}
- public MessageKind messageKind;
+ public EventKind eventKind;
- public Message(JobId jobId, MessageKind msgKind) {
+ public FeedEvent(JobId jobId, EventKind eventKind) {
+ this(jobId, eventKind, null);
+ }
+
+ public FeedEvent(JobId jobId, EventKind eventKind, FeedId feedId) {
this.jobId = jobId;
- this.messageKind = msgKind;
+ this.eventKind = eventKind;
+ this.feedId = feedId;
}
}
@@ -469,8 +476,8 @@
return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
}
- public void registerFeedJoint(IFeedJoint feedJoint) {
- feedJobNotificationHandler.registerFeedJoint(feedJoint);
+ public void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+ feedJobNotificationHandler.registerFeedJoint(feedJoint, numOfPrividers);
}
public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
@@ -496,4 +503,8 @@
return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
}
+ public void notifyProviderReady(FeedId feedId, JobId jobId) {
+ jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PROVIDER_READY, feedId));
+ }
+
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 213b090..67bf3bb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -26,7 +26,6 @@
import java.io.InputStreamReader;
import java.rmi.RemoteException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -2205,8 +2204,9 @@
metadataProvider, policyAccessor);
//adapter configuration are valid at this stage
//register the feed joints (these are auto-de-registered)
+ int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
for (IFeedJoint fj : triple.third) {
- FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+ FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers);
}
JobUtils.runJob(hcc, pair.first, false);
/*
@@ -2220,7 +2220,7 @@
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
} else {
for (IFeedJoint fj : triple.third) {
- FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+ FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, 0);
}
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 258bc35..2378032 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -25,9 +25,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.app.external.FeedLifecycleListener;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
@@ -36,6 +37,7 @@
import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -79,12 +81,20 @@
case COMPLETE_FAILBACK_RESPONSE:
handleCompleteFailbcakResponse(message);
break;
+ case FEED_PROVIDER_READY:
+ handleFeedProviderReady(message);
+ break;
default:
LOGGER.warning("Unknown message: " + absMessage.getMessageType());
break;
}
}
+ private void handleFeedProviderReady(IMessage message) {
+ FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message;
+ FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), msg.getJobId());
+ }
+
private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
similarity index 92%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
index 864ce01..b696974 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
@@ -23,7 +23,7 @@
*/
use dataverse KeyVerse;
-for $d in dataset KVStore
-order by meta().id
-limit 5
-return meta().id;
+count(
+ for $d in dataset KVStore
+ return $d
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql
deleted file mode 100644
index db6954e..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-4000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
index b696974..28e9a15 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
@@ -24,6 +24,7 @@
use dataverse KeyVerse;
count(
- for $d in dataset KVStore
- return $d
+for $d in dataset KVStore
+distinct by meta()."key"
+return 1
);
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
index 28e9a15..7423399 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
@@ -23,8 +23,12 @@
*/
use dataverse KeyVerse;
-count(
+
for $d in dataset KVStore
-distinct by meta()."key"
-return 1
-);
+group by $vb := meta().vbucket with $d
+order by $vb
+limit 5
+return {
+ "vbucket": $vb,
+ "tuple_count": count($d)
+};
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql
deleted file mode 100644
index 7423399..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-use dataverse KeyVerse;
-
-
-for $d in dataset KVStore
-group by $vb := meta().vbucket with $d
-order by $vb
-limit 5
-return {
- "vbucket": $vb,
- "tuple_count": count($d)
-};
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql
deleted file mode 100644
index d282e66..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
index 4d2f9c4..4c1635f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -26,3 +26,4 @@
set wait-for-completion-feed "false";
connect feed TweetFeed to dataset Tweets;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
deleted file mode 100644
index e70df33..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Drop a dataverse with disconnected feed
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
deleted file mode 100644
index 34d6285..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Drop a dataverse with disconnected feed
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-
-use dataverse experiments;
-disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
deleted file mode 100644
index eb18795..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a socket feed with a client that pushes
- * 10 records. The feed is connected to a dataset that is then
- * queried for the data.
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
similarity index 92%
copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
index 864ce01..b696974 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
@@ -23,7 +23,7 @@
*/
use dataverse KeyVerse;
-for $d in dataset KVStore
-order by meta().id
-limit 5
-return meta().id;
+count(
+ for $d in dataset KVStore
+ return $d
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql
deleted file mode 100644
index db6954e..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date : 24th Feb 2016
- */
-4000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
index b696974..864ce01 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
@@ -23,7 +23,7 @@
*/
use dataverse KeyVerse;
-count(
- for $d in dataset KVStore
- return $d
-);
+for $d in dataset KVStore
+order by meta().id
+limit 5
+return meta().id;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql
similarity index 100%
copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..fba74e8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -35,7 +35,8 @@
PREPARE_PARTITIONS_FAILBACK_RESPONSE,
COMPLETE_FAILBACK_REQUEST,
COMPLETE_FAILBACK_RESPONSE,
- REPLICA_EVENT
+ REPLICA_EVENT,
+ FEED_PROVIDER_READY
}
public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
new file mode 100644
index 0000000..7b74ef9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+
+public class FeedOperationCounter {
+ private FeedJobInfo feedJobInfo;
+ private int providersCount;
+ private int jobsCount;
+ private boolean failedIngestion = false;
+
+ public FeedOperationCounter(int providersCount, int jobsCount) {
+ this.providersCount = providersCount;
+ this.jobsCount = jobsCount;
+ }
+
+ public int getProvidersCount() {
+ return providersCount;
+ }
+
+ public void setProvidersCount(int providersCount) {
+ this.providersCount = providersCount;
+ }
+
+ public int getJobsCount() {
+ return jobsCount;
+ }
+
+ public void setJobsCount(int jobsCount) {
+ this.jobsCount = jobsCount;
+ }
+
+ public boolean isFailedIngestion() {
+ return failedIngestion;
+ }
+
+ public void setFailedIngestion(boolean failedIngestion) {
+ this.failedIngestion = failedIngestion;
+ }
+
+ public FeedJobInfo getFeedJobInfo() {
+ return feedJobInfo;
+ }
+
+ public void setFeedJobInfo(FeedJobInfo feedJobInfo) {
+ this.feedJobInfo = feedJobInfo;
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
new file mode 100644
index 0000000..4c81c5b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedProviderReadyMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final FeedId feedId;
+ private final JobId jobId;
+
+ public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
+ this.feedId = feedId;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.FEED_PROVIDER_READY;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index d0348c2..8475e45 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -147,22 +147,7 @@
}
private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
- int waitCycleCount = 0;
- ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- while (ingestionRuntime == null && waitCycleCount < 1000) {
- try {
- Thread.sleep(3000);
- waitCycleCount++;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- break;
- }
- ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- }
- return (IngestionRuntime) ingestionRuntime;
+ return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
}
public ConnectionLocation getSubscriptionLocation() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index c1748d9..6771010 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,6 +36,7 @@
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -115,6 +116,9 @@
ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
adapterRuntimeManager, ctx);
feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
+ // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
+ ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+ null);
feedFrameWriter.open();
} else {
if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {