[NO ISSUE][RT] Profiling fixes

- Fix profile serialization for profiles > 64kb in size
- Break deadlock for OperatorNodePushables that keep initialize() open
  and call downstream operators that block until other operators have
  initialized (e.g. Intersect).

Change-Id: I87ec970eaf2d5db76e7bfaa60be9190efb1a70ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4263
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Sandeep Gupta <Sandeep.gupta@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index c0cf8eb..a5c8162 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -54,11 +54,15 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 
 /**
  * An interface that takes care of executing a list of statements that are submitted through an Asterix API
  */
 public interface IStatementExecutor {
+    public static final char UNIT_SEPARATOR = 31;
+    public static final char END_OF_BLOCK = 23;
 
     /**
      * Specifies result delivery of executed statements
@@ -200,12 +204,35 @@
 
         private void writeObject(ObjectOutputStream out) throws IOException {
             ObjectMapper om = new ObjectMapper();
-            out.writeUTF(om.writeValueAsString(profile));
+            java.lang.String prof = om.writeValueAsString(profile);
+            //split the string if it is >=64K to avoid writeUTF limit
+            List<String> pieces;
+            if (prof.length() > 65534L) {
+                pieces = Lists.newArrayList(Splitter.fixedLength(32768).split(prof));
+            } else {
+                pieces = Lists.newArrayList(prof);
+            }
+
+            for (int i = 0; i < pieces.size(); i++) {
+                if (i == pieces.size() - 1) {
+                    out.writeChar(UNIT_SEPARATOR);
+                } else {
+                    out.writeChar(END_OF_BLOCK);
+                }
+                out.writeUTF(pieces.get(i));
+            }
+
         }
 
         private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
             ObjectMapper om = new ObjectMapper();
-            JsonNode inNode = om.readTree(in.readUTF());
+            StringBuilder objSplits = new StringBuilder();
+            for (char cmd = in.readChar(); cmd != END_OF_BLOCK && cmd == UNIT_SEPARATOR; cmd = in.readChar()) {
+                objSplits.append(in.readUTF());
+            }
+            objSplits.append(in.readUTF());
+
+            JsonNode inNode = om.readTree(objSplits.toString());
             if (!inNode.isObject()) {
                 throw new IOException("Deserialization error");
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
index 2d46bea..1d47c98 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
@@ -33,7 +33,7 @@
     HashMap<Integer, IFrameWriter> inputs;
     long frameStart;
 
-    TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException {
+    private TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) {
         super(null, collector, op.getDisplayName());
         this.op = op;
         inputs = new HashMap<>();
@@ -41,20 +41,16 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.initialize();
-            stopClock();
-        }
+        startClock();
+        op.initialize();
+        stopClock();
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.deinitialize();
-            stopClock();
-        }
+        startClock();
+        op.deinitialize();
+        stopClock();
     }
 
     @Override
@@ -83,15 +79,19 @@
     }
 
     private void stopClock() {
-        pause();
-        collector.giveClock(this);
+        synchronized (collector) {
+            pause();
+            collector.giveClock(this);
+        }
     }
 
     private void startClock() {
-        if (frameStart > 0) {
-            return;
+        synchronized (collector) {
+            if (frameStart > 0) {
+                return;
+            }
+            frameStart = collector.takeClock(this);
         }
-        frameStart = collector.takeClock(this);
     }
 
     @Override
@@ -113,11 +113,11 @@
         }
     }
 
-    public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx)
-            throws HyracksDataException {
+    public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx) {
         if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
             return new TimedOperatorNodePushable(op, ctx.getStatsCollector());
         }
         return op;
     }
+
 }