Improve performance of NotifyBrokerRuntime code
Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index b7d775a..cc8204e 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -276,6 +276,16 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-data</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
<version>${hyracks.version}</version>
</dependency>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..8e07af2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
package org.apache.asterix.bad.runtime;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes =
- new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
- private final ARecordSerializerDeserializer recordSerDes;
+ private static final AStringSerializerDeserializer stringSerDes =
+ new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+ private final IPrinter recordPrinterFactory;
+ private final IPrinter subscriptionIdListPrinterFactory;
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
- private final ActiveManager activeManager;
private final EntityId entityId;
private final boolean push;
- private AOrderedList pushList;
- private ARecord pushRecord;
- private final IAType recordType;
- private final Map<String, HashSet<String>> sendData = new HashMap<>();
+ private final Map<String, String> sendData = new HashMap<>();
+ private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
+ private final Map<String, PrintStream> sendStreams = new HashMap<>();
private String executionTimeString;
+ private boolean firstResult = true;
+ String endpoint;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
- .getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
this.push = push;
- this.pushList = null;
- this.pushRecord = null;
- this.recordType = recordType;
- recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+ recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
+ subscriptionIdListPrinterFactory =
+ new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
executionTimeString = null;
}
@@ -106,28 +106,18 @@
return;
}
- private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
- for (int i = 0; i < subscriptionIds.size(); i++) {
- AUUID subId = (AUUID) subscriptionIds.getItem(i);
- String subscriptionString = subId.toString();
- //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
- subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
- subscriptionString = "\"" + subscriptionString + "\"";
- sendData.get(endpoint).add(subscriptionString);
- }
- }
-
public String createData(String endpoint) {
- String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
- + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
- + executionTimeString + "\", \"subscriptionIds\":[";
- for (String value : sendData.get(endpoint)) {
- JSON += value;
- JSON += ",";
+ String resultTitle = "\"subscriptionIds";
+ if (push) {
+ resultTitle = "\"results\"";
}
- JSON = JSON.substring(0, JSON.length() - 1);
- JSON += "]}";
- return JSON;
+ String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ + executionTimeString + "\", " + resultTitle + ":[";
+ jsonStr += sendData.get(endpoint);
+ jsonStr = jsonStr.substring(0, jsonStr.length());
+ jsonStr += "]}";
+ return jsonStr;
}
@@ -172,6 +162,11 @@
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);
+ /*The incoming tuples have three fields:
+ 1. eval0 will get the serialized broker endpoint string
+ 2. eval1 will get the payload (either the subscriptionIds or entire results)
+ 3. eval2 will get the channel execution time stamp (the same for all tuples)
+ */
if (executionTimeString == null) {
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
@@ -185,34 +180,51 @@
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
- String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
- sendData.putIfAbsent(endpoint, new HashSet<>());
+ endpoint = stringSerDes.deserialize(di).getStringValue();
+ sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+ try {
+ sendStreams.putIfAbsent(endpoint,
+ new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
+ } catch (UnsupportedEncodingException e) {
+ throw new HyracksDataException(e.getMessage());
+ }
if (push) {
int pushOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
- //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
- pushRecord = recordSerDes.deserialize(di);
- sendData.get(endpoint).add(pushRecord.toString());
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
+ sendStreams.get(endpoint));
} else {
- int serSubOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
- pushList = subSerDes.deserialize(di);
- addSubscriptions(endpoint, pushList);
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
+ inputArg1.getLength(),
+ sendStreams.get(endpoint));
}
+ firstResult = false;
}
}
@Override
public void close() throws HyracksDataException {
- for (String endpoint : sendData.keySet()) {
- if (sendData.get(endpoint).size() > 0) {
- sendGroupOfResults(endpoint);
- sendData.get(endpoint).clear();
+ for (String endpoint : sendStreams.keySet()) {
+ sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+ sendGroupOfResults(endpoint);
+ sendStreams.get(endpoint).close();
+ try {
+ sendbaos.get(endpoint).close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
}
+
}
+
return;
}