minor changes
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 78daa08..5d9320c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -39,9 +39,20 @@
private FeedInboxMonitor feedInboxMonitor;
+ private int partition;
+
public enum State {
+ /*
+ * Indicates that data from external source will be pushed downstream for storage
+ */
ACTIVE_INGESTION,
+ /*
+ * Indicates that data from external source would be buffered and not pushed downstream
+ */
INACTIVE_INGESTION,
+ /*
+ * Indicates that feed ingestion activity has finished
+ */
FINISHED_INGESTION
}
@@ -49,6 +60,7 @@
int partition, LinkedBlockingQueue<IFeedMessage> inbox) {
this.feedId = feedId;
this.feedAdapter = feedAdapter;
+ this.partition = partition;
this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
this.adapterRuntime = new Thread(adapterExecutor);
this.feedInboxMonitor = new FeedInboxMonitor(this, inbox, partition);
@@ -85,6 +97,11 @@
return feedId;
}
+ @Override
+ public String toString() {
+ return feedId + "[" + partition + "]";
+ }
+
public IFeedAdapter getFeedAdapter() {
return feedAdapter;
}
@@ -97,8 +114,6 @@
private MaterializingFrameWriter writer;
- private final int partition;
-
private IFeedAdapter adapter;
private AdapterRuntimeManager runtimeManager;
@@ -106,7 +121,6 @@
public AdapterExecutor(int partition, MaterializingFrameWriter writer, IFeedAdapter adapter,
AdapterRuntimeManager adapterRuntimeMgr) {
this.writer = writer;
- this.partition = partition;
this.adapter = adapter;
this.runtimeManager = adapterRuntimeMgr;
}
@@ -114,16 +128,21 @@
@Override
public void run() {
try {
+ int partition = runtimeManager.getPartition();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting ingestion for partition:" + partition);
}
adapter.start(partition, writer);
runtimeManager.setState(State.FINISHED_INGESTION);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception during feed ingestion " + e.getMessage());
+ }
+ e.printStackTrace();
+ } finally {
synchronized (runtimeManager) {
runtimeManager.notifyAll();
}
- } catch (Exception e) {
- e.printStackTrace();
}
}
@@ -140,30 +159,29 @@
}
}
- public int getPartition() {
- return partition;
- }
-
}
- public State getState() {
+ public synchronized State getState() {
return state;
}
@SuppressWarnings("incomplete-switch")
- public void setState(State state) throws HyracksDataException {
+ public synchronized void setState(State state) throws HyracksDataException {
if (this.state.equals(state)) {
return;
}
switch (state) {
case INACTIVE_INGESTION:
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("SETTING STORE MODE");
+ LOGGER.info("Set " + Mode.STORE + " mode");
}
adapterExecutor.getWriter().setMode(Mode.STORE);
break;
case ACTIVE_INGESTION:
adapterExecutor.getWriter().setMode(Mode.FORWARD);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Set " + Mode.FORWARD + " mode");
+ }
break;
}
this.state = state;
@@ -173,4 +191,8 @@
return adapterExecutor;
}
+ public int getPartition() {
+ return partition;
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 7c9a6d8..718310b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -55,7 +55,7 @@
List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(feedId);
if (adapterRuntimeMgrs != null) {
for (AdapterRuntimeManager mgr : adapterRuntimeMgrs) {
- if (mgr.getAdapterExecutor().getPartition() == partition) {
+ if (mgr.getPartition() == partition) {
return mgr;
}
}