[NO ISSUE][*DB][ACT] += ActiveManagerMessage.GENERIC_EVENT
Change-Id: I8f8986a90a6ac34a24118ace1a76401d65924055
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10863
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index ba04967..08e1be4 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -112,14 +112,31 @@
case REQUEST_STATS:
requestStats((ActiveStatsRequestMessage) message);
break;
+ case GENERIC_EVENT:
+ deliverGenericEvent(message);
+ break;
default:
LOGGER.warn("Unknown message type received: " + message.getKind());
}
}
+ private void deliverGenericEvent(ActiveManagerMessage message) throws HyracksDataException {
+ try {
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
+ IActiveRuntime runtime = runtimes.get(runtimeId);
+ if (runtime == null) {
+ LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message);
+ return;
+ }
+ runtime.handleGenericEvent(message);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
try {
- ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
long reqId = message.getReqId();
if (runtime == null) {
@@ -168,7 +185,7 @@
@SuppressWarnings("squid:S1181") // Catch Error
private void stopRuntime(ActiveManagerMessage message) {
StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload();
- ActiveRuntimeId runtimeId = content.getRuntimeId();
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
LOGGER.warn("Request to stop runtime: " + runtimeId
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index a52f01e..b8edc64 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IActiveRuntime {
@@ -48,4 +49,8 @@
default String getStats() {
return "\"Runtime stats is not available.\"";
}
+
+ default void handleGenericEvent(ActiveManagerMessage event) throws HyracksDataException {
+ throw new IllegalStateException("generic events not supported for runtime " + getRuntimeId());
+ }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 1a2af13..4d726cf 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
@@ -29,15 +30,18 @@
public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage {
public enum Kind {
STOP_ACTIVITY,
- REQUEST_STATS
+ REQUEST_STATS,
+ GENERIC_EVENT
}
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final Kind kind;
+ private final ActiveRuntimeId runtimeId;
private final Serializable payload;
- public ActiveManagerMessage(Kind kind, Serializable payload) {
+ public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) {
this.kind = kind;
+ this.runtimeId = runtimeId;
this.payload = payload;
}
@@ -45,6 +49,10 @@
return payload;
}
+ public ActiveRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
public Kind getKind() {
return kind;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 117a68c..94668a0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.active.message;
-import java.io.Serializable;
+import org.apache.asterix.active.ActiveRuntimeId;
public class ActiveStatsRequestMessage extends ActiveManagerMessage {
private static final long serialVersionUID = 1L;
private final long reqId;
- public ActiveStatsRequestMessage(Serializable payload, long reqId) {
- super(Kind.REQUEST_STATS, payload);
+ public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) {
+ super(Kind.REQUEST_STATS, runtimeId, null);
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
index fbc41a1..c21f06e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
@@ -21,25 +21,17 @@
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
-import org.apache.asterix.active.ActiveRuntimeId;
-
public class StopRuntimeParameters implements Serializable {
- private static final long serialVersionUID = 1L;
- private final ActiveRuntimeId runtimeId;
+ private static final long serialVersionUID = 2L;
private final long timeout;
private final TimeUnit unit;
- public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) {
- this.runtimeId = runtimeId;
+ public StopRuntimeParameters(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
- public ActiveRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
public long getTimeout() {
return timeout;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c90cde0..5f7d65e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -515,7 +515,7 @@
}
ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
- new StopRuntimeParameters(runtimeId, timeout, unit)), location);
+ runtimeId, new StopRuntimeParameters(timeout, unit)), location);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index cb97526..0e6ddb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -23,6 +23,7 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
@@ -95,4 +96,8 @@
default LongSupplier getLineNumber() {
return ExternalDataConstants.NO_LINES;
}
+
+ default void handleGenericEvent(ActiveManagerMessage event) {
+ throw new IllegalStateException("unexpected generic event " + event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index b58e604..94d9e6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -57,4 +58,8 @@
feedLogManager.close();
}
}
+
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ throw new IllegalStateException("unexpected generic event " + event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 56257a8..8cec5de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IRawRecord;
@@ -280,4 +281,9 @@
.append("}");
return str.toString();
}
+
+ @Override
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ recordReader.handleGenericEvent(event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 54e633a..fc9b727 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -69,4 +70,8 @@
public void close() throws IOException {
controller.close();
}
+
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ controller.handleGenericEvent(event);
+ }
}