remove staticness of lifecyclemanager and asterixthreadexecutor/factory; remove unnecessary asterixappruntimecontext used only for recovery
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index f8d5ea2..7366c54 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -153,6 +153,11 @@
<artifactId>jdom</artifactId>
<version>1.0</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.1-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index a48cfb8..64f947c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -75,6 +75,6 @@
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, partition);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index 31470f3..699f3d6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -33,6 +34,7 @@
*/
public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
private final IDatasourceAdapter adapter;
private final int partition;
private final IFeedManager feedManager;
@@ -40,7 +42,9 @@
private final LinkedBlockingQueue<IFeedMessage> inbox;
private FeedInboxMonitor feedInboxMonitor;
- public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, int partition) {
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IDatasourceAdapter adapter,
+ int partition) {
+ this.ctx = ctx;
this.adapter = adapter;
this.partition = partition;
this.feedManager = (IFeedManager) FeedManager.INSTANCE;
@@ -52,7 +56,8 @@
public void open() throws HyracksDataException {
if (adapter instanceof IManagedFeedAdapter) {
feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
- AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
+ ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getThreadExecutor().execute(feedInboxMonitor);
feedManager.registerFeedMsgQueue(feedId, inbox);
}
writer.open();
@@ -82,7 +87,7 @@
@Override
public void close() throws HyracksDataException {
-
+
}
@Override