[NO ISSUE][RT] Profiling fixes
- Fix profile serialization for profiles > 64kb in size
- Break deadlock for OperatorNodePushables that keep initialize() open
and call downstream operators that block until other operators have
initialized (e.g. Intersect).
Change-Id: I87ec970eaf2d5db76e7bfaa60be9190efb1a70ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4263
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Sandeep Gupta <Sandeep.gupta@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index c0cf8eb..a5c8162 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -54,11 +54,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
/**
* An interface that takes care of executing a list of statements that are submitted through an Asterix API
*/
public interface IStatementExecutor {
+ public static final char UNIT_SEPARATOR = 31;
+ public static final char END_OF_BLOCK = 23;
/**
* Specifies result delivery of executed statements
@@ -200,12 +204,35 @@
private void writeObject(ObjectOutputStream out) throws IOException {
ObjectMapper om = new ObjectMapper();
- out.writeUTF(om.writeValueAsString(profile));
+ java.lang.String prof = om.writeValueAsString(profile);
+ //split the string if it is >=64K to avoid writeUTF limit
+ List<String> pieces;
+ if (prof.length() > 65534L) {
+ pieces = Lists.newArrayList(Splitter.fixedLength(32768).split(prof));
+ } else {
+ pieces = Lists.newArrayList(prof);
+ }
+
+ for (int i = 0; i < pieces.size(); i++) {
+ if (i == pieces.size() - 1) {
+ out.writeChar(UNIT_SEPARATOR);
+ } else {
+ out.writeChar(END_OF_BLOCK);
+ }
+ out.writeUTF(pieces.get(i));
+ }
+
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
ObjectMapper om = new ObjectMapper();
- JsonNode inNode = om.readTree(in.readUTF());
+ StringBuilder objSplits = new StringBuilder();
+ for (char cmd = in.readChar(); cmd != END_OF_BLOCK && cmd == UNIT_SEPARATOR; cmd = in.readChar()) {
+ objSplits.append(in.readUTF());
+ }
+ objSplits.append(in.readUTF());
+
+ JsonNode inNode = om.readTree(objSplits.toString());
if (!inNode.isObject()) {
throw new IOException("Deserialization error");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
index 2d46bea..1d47c98 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
@@ -33,7 +33,7 @@
HashMap<Integer, IFrameWriter> inputs;
long frameStart;
- TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException {
+ private TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) {
super(null, collector, op.getDisplayName());
this.op = op;
inputs = new HashMap<>();
@@ -41,20 +41,16 @@
@Override
public void initialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.initialize();
- stopClock();
- }
+ startClock();
+ op.initialize();
+ stopClock();
}
@Override
public void deinitialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.deinitialize();
- stopClock();
- }
+ startClock();
+ op.deinitialize();
+ stopClock();
}
@Override
@@ -83,15 +79,19 @@
}
private void stopClock() {
- pause();
- collector.giveClock(this);
+ synchronized (collector) {
+ pause();
+ collector.giveClock(this);
+ }
}
private void startClock() {
- if (frameStart > 0) {
- return;
+ synchronized (collector) {
+ if (frameStart > 0) {
+ return;
+ }
+ frameStart = collector.takeClock(this);
}
- frameStart = collector.takeClock(this);
}
@Override
@@ -113,11 +113,11 @@
}
}
- public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx)
- throws HyracksDataException {
+ public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx) {
if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
return new TimedOperatorNodePushable(op, ctx.getStatsCollector());
}
return op;
}
+
}