Added profiling visualization. Added multi-resolution profiling

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@903 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 32a3afd..75cc245 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -62,6 +62,6 @@
             return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
                     .substring(idIdx + 1)));
         }
-        throw new IllegalArgumentException();
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index 330e3a2..8794e09 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -55,6 +55,6 @@
             str = str.substring(5);
             return new OperatorDescriptorId(Integer.parseInt(str));
         }
-        throw new IllegalArgumentException();
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 96437bf..0fb44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -61,6 +61,6 @@
             int idIdx = str.lastIndexOf(':');
             return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
         }
-        throw new IllegalArgumentException();
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 78abaa2..7e0b22d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -61,6 +61,6 @@
             int idIdx = str.lastIndexOf(':');
             return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
         }
-        throw new IllegalArgumentException();
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index 3a86c42..8d244e8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -23,13 +23,13 @@
 import java.util.Map;
 
 import org.apache.wicket.markup.html.basic.Label;
-import org.apache.wicket.markup.html.panel.EmptyPanel;
 import org.apache.wicket.request.mapper.parameter.PageParameters;
 import org.apache.wicket.util.string.StringValue;
 import org.json.JSONArray;
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -41,7 +41,6 @@
     private static final long serialVersionUID = 1L;
 
     private static final int HEIGHT = 29;
-    private static final int WIDTH = 9;
 
     public JobDetailsPage(PageParameters params) throws Exception {
         ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
@@ -119,10 +118,11 @@
                                         tcAttempts[k].tasks = new TaskAttempt[taArray.length()];
                                         for (int l = 0; l < taArray.length(); ++l) {
                                             JSONObject taO = taArray.getJSONObject(l);
-                                            TaskAttempt ta = new TaskAttempt(taO.getString("task-id"),
-                                                    taO.getLong("start-time"), taO.getLong("end-time"));
+                                            TaskAttemptId taId = TaskAttemptId.parse(taO.getString("task-attempt-id"));
+                                            TaskAttempt ta = new TaskAttempt(taId, taO.getLong("start-time"),
+                                                    taO.getLong("end-time"));
                                             tcAttempts[k].tasks[l] = ta;
-                                            TaskId tid = TaskId.parse(taO.getString("task-id"));
+                                            TaskId tid = taId.getTaskId();
                                             ta.name = activityMap.get(tid.getActivityId());
                                             ta.partition = tid.getPartition();
                                         }
@@ -149,6 +149,46 @@
             }
         }
 
+        Map<TaskAttemptId, TaskProfile> tpMap = new HashMap<TaskAttemptId, TaskProfile>();
+        if (jrO.has("profile")) {
+            JSONObject pO = jrO.getJSONObject("profile");
+            if (pO.has("joblets")) {
+                JSONArray jobletsA = pO.getJSONArray("joblets");
+                for (int i = 0; i < jobletsA.length(); ++i) {
+                    JSONObject jobletO = jobletsA.getJSONObject(i);
+                    if (jobletO.has("tasks")) {
+                        JSONArray tasksA = jobletO.getJSONArray("tasks");
+                        for (int j = 0; j < tasksA.length(); ++j) {
+                            JSONObject taskO = tasksA.getJSONObject(j);
+                            ActivityId activityId = ActivityId.parse(taskO.getString("activity-id"));
+                            int partition = taskO.getInt("partition");
+                            int attempt = taskO.getInt("attempt");
+                            TaskAttemptId taId = new TaskAttemptId(new TaskId(activityId, partition), attempt);
+                            if (taskO.has("partition-send-profile")) {
+                                JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile");
+                                for (int k = 0; k < taskProfilesA.length(); ++k) {
+                                    JSONObject ppO = taskProfilesA.getJSONObject(k);
+                                    long openTime = ppO.getLong("open-time");
+                                    long closeTime = ppO.getLong("close-time");
+                                    int resolution = ppO.getInt("resolution");
+                                    long offset = ppO.getLong("offset");
+                                    JSONArray frameTimesA = ppO.getJSONArray("frame-times");
+                                    long[] frameTimes = new long[frameTimesA.length()];
+                                    for (int l = 0; l < frameTimes.length; ++l) {
+                                        frameTimes[l] = frameTimesA.getInt(l) + offset;
+                                    }
+                                    TaskProfile tp = new TaskProfile(taId, openTime, closeTime, frameTimes, resolution);
+                                    if (!tpMap.containsKey(tp.taId)) {
+                                        tpMap.put(tp.taId, tp);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
         if (!tcList.isEmpty()) {
             Collections.sort(tcList, new Comparator<TaskClusterAttempt[]>() {
                 @Override
@@ -188,11 +228,42 @@
                         buffer.append("<rect x=\"").append(tStartTime * width + leftOffset).append("\" y=\"")
                                 .append(y * (HEIGHT + 1) + HEIGHT / 4).append("\" width=\"")
                                 .append(width * (tEndTime - tStartTime)).append("\" height=\"").append(HEIGHT / 2)
-                                .append("\"/>\n");
+                                .append("\" style=\"fill:rgb(255,255,255);stroke-width:1;stroke:rgb(0,0,0)\"/>\n");
                         buffer.append("<text x=\"").append(tEndTime * width + leftOffset + 20).append("\" y=\"")
                                 .append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">")
                                 .append((tEndTime - tStartTime) + " ms (" + ta.name + ":" + ta.partition + ")")
                                 .append("</text>\n");
+                        TaskProfile tp = tpMap.get(ta.taId);
+                        if (tp != null) {
+                            for (int k = 0; k < tp.frameTimes.length; ++k) {
+                                long taOpenTime = tp.openTime - minTime;
+                                buffer.append("<rect x=\"")
+                                        .append(taOpenTime * width + leftOffset)
+                                        .append("\" y=\"")
+                                        .append(y * (HEIGHT + 1) + HEIGHT / 4)
+                                        .append("\" width=\"1\" height=\"")
+                                        .append(HEIGHT / 2)
+                                        .append("\" style=\"fill:rgb(255,0,0);stroke-width:1;stroke:rgb(255,0,0)\"/>\n");
+                                for (long ft : tp.frameTimes) {
+                                    long taNextTime = ft - minTime;
+                                    buffer.append("<rect x=\"")
+                                            .append(taNextTime * width + leftOffset)
+                                            .append("\" y=\"")
+                                            .append(y * (HEIGHT + 1) + HEIGHT / 4)
+                                            .append("\" width=\"1\" height=\"")
+                                            .append(HEIGHT / 2)
+                                            .append("\" style=\"fill:rgb(0,255,0);stroke-width:1;stroke:rgb(0,255,0)\"/>\n");
+                                }
+                                long taCloseTime = tp.closeTime - minTime;
+                                buffer.append("<rect x=\"")
+                                        .append(taCloseTime * width + leftOffset)
+                                        .append("\" y=\"")
+                                        .append(y * (HEIGHT + 1) + HEIGHT / 4)
+                                        .append("\" width=\"1\" height=\"")
+                                        .append(HEIGHT / 2)
+                                        .append("\" style=\"fill:rgb(0,0,255);stroke-width:1;stroke:rgb(0,0,255)\"/>\n");
+                            }
+                        }
                         ++y;
                     }
                 }
@@ -209,96 +280,17 @@
             markup.setEscapeModelStrings(false);
             add(markup);
         }
-
-        List<TaskProfile> taskProfiles = new ArrayList<TaskProfile>();
-        if (jrO.has("profile")) {
-            JSONObject pO = jrO.getJSONObject("profile");
-            if (pO.has("joblets")) {
-                JSONArray jobletsA = pO.getJSONArray("joblets");
-                for (int i = 0; i < jobletsA.length(); ++i) {
-                    JSONObject jobletO = jobletsA.getJSONObject(i);
-                    if (jobletO.has("tasks")) {
-                        JSONArray tasksA = jobletO.getJSONArray("tasks");
-                        for (int j = 0; j < tasksA.length(); ++j) {
-                            JSONObject taskO = tasksA.getJSONObject(j);
-                            String activityId = taskO.getString("activity-id");
-                            int partition = taskO.getInt("partition");
-                            int attempt = taskO.getInt("attempt");
-                            if (taskO.has("partition-send-profile")) {
-                                JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile");
-                                for (int k = 0; k < taskProfilesA.length(); ++k) {
-                                    JSONObject ppO = taskProfilesA.getJSONObject(k);
-                                    long openTime = ppO.getLong("open-time");
-                                    long closeTime = ppO.getLong("close-time");
-                                    JSONArray frameTimesA = ppO.getJSONArray("frame-times");
-                                    long[] frameTimes = new long[frameTimesA.length()];
-                                    for (int l = 0; l < frameTimes.length; ++l) {
-                                        frameTimes[l] = frameTimesA.getLong(l);
-                                    }
-                                    taskProfiles.add(new TaskProfile(activityId, partition, attempt, openTime,
-                                            closeTime, frameTimes));
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        if (!taskProfiles.isEmpty()) {
-            Collections.sort(taskProfiles, new Comparator<TaskProfile>() {
-                @Override
-                public int compare(TaskProfile o1, TaskProfile o2) {
-                    return o1.openTime < o2.openTime ? -1 : (o1.openTime > o2.openTime ? 1 : 0);
-                }
-            });
-            long startTime = taskProfiles.get(0).openTime;
-            long timeRange = taskProfiles.get(taskProfiles.size() - 1).closeTime - startTime;
-            int n = taskProfiles.size();
-            StringBuilder buffer = new StringBuilder();
-            buffer.append("<svg viewBox=\"0 0 ").append((timeRange + 1) * (WIDTH + 1)).append(' ')
-                    .append((n + 1) * (HEIGHT + 1)).append("\" version=\"1.1\"\n");
-            buffer.append("xmlns=\"http://www.w3.org/2000/svg\">\n");
-            for (int i = 0; i < n; ++i) {
-                TaskProfile tp = taskProfiles.get(i);
-                open(buffer, i, tp.openTime - startTime);
-                for (long ft : tp.frameTimes) {
-                    nextFrame(buffer, i, ft - startTime);
-                }
-                close(buffer, i, tp.closeTime - startTime);
-            }
-            buffer.append("</svg>");
-            Label markup = new Label("job-timeline-profile", buffer.toString());
-            markup.setEscapeModelStrings(false);
-            add(markup);
-        } else {
-            add(new EmptyPanel("job-timeline-profile"));
-        }
-    }
-
-    private void open(StringBuilder buffer, int i, long openTime) {
-        buffer.append("<rect x=\"").append(openTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
-                .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
-    }
-
-    private void close(StringBuilder buffer, int i, long closeTime) {
-        buffer.append("<rect x=\"").append(closeTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
-                .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
-    }
-
-    private void nextFrame(StringBuilder buffer, int i, long frameTime) {
-        buffer.append("<rect x=\"").append(frameTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
-                .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
     }
 
     private static class TaskAttempt {
-        private String taskId;
+        private TaskAttemptId taId;
         private long startTime;
         private long endTime;
         private String name;
         private int partition;
 
-        public TaskAttempt(String taskId, long startTime, long endTime) {
-            this.taskId = taskId;
+        public TaskAttempt(TaskAttemptId taId, long startTime, long endTime) {
+            this.taId = taId;
             this.startTime = startTime;
             this.endTime = endTime;
         }
@@ -320,21 +312,18 @@
     }
 
     private static class TaskProfile {
-        private String activityId;
-        private int partition;
-        private int attempt;
+        private TaskAttemptId taId;
         private long openTime;
         private long closeTime;
         private long[] frameTimes;
+        private int resolution;
 
-        public TaskProfile(String activityId, int partition, int attempt, long openTime, long closeTime,
-                long[] frameTimes) {
-            this.activityId = activityId;
-            this.partition = partition;
-            this.activityId = activityId;
+        public TaskProfile(TaskAttemptId taId, long openTime, long closeTime, long[] frameTimes, int resolution) {
+            this.taId = taId;
             this.openTime = openTime;
             this.closeTime = closeTime;
             this.frameTimes = frameTimes;
+            this.resolution = resolution;
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
index 8c5578d..d2068fd 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
@@ -6,5 +6,4 @@
     <div id="job-run" wicket:id="job-run" style="display: none;">
     </div>
     <div wicket:id="job-timeline" style="overflow: auto;"></div>
-    <div wicket:id="job-timeline-profile"></div>
 </wicket:extend>
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int[] times;
+
+    private long offset;
+
+    private int ptr;
+
+    private int resolution;
+
+    private int eventCounter;
+
+    public MultiResolutionEventProfiler(int nSamples) {
+        times = new int[nSamples];
+        offset = -1;
+        ptr = 0;
+        resolution = 1;
+        eventCounter = 0;
+    }
+
+    public void reportEvent() {
+        ++eventCounter;
+        if (eventCounter % resolution != 0) {
+            return;
+        }
+        if (ptr >= times.length) {
+            compact();
+            return;
+        }
+        eventCounter = 0;
+        long time = System.currentTimeMillis();
+        if (offset < 0) {
+            offset = time;
+        }
+        int value = (int) (time - offset);
+        times[ptr++] = value;
+    }
+
+    private void compact() {
+        for (int i = 1; i < ptr / 2; ++i) {
+            times[i] = times[i * 2];
+        }
+        resolution <<= 1;
+        ptr >>= 1;
+    }
+
+    public int getResolution() {
+        return resolution;
+    }
+
+    public int getCount() {
+        return ptr;
+    }
+
+    public int[] getSamples() {
+        return times;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
index f6568d9..ef61796 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -17,6 +17,7 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
 
 public class PartitionProfile implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -27,13 +28,13 @@
 
     private final long closeTime;
 
-    private final byte[] frameTimes;
+    private final MultiResolutionEventProfiler mrep;
 
-    public PartitionProfile(PartitionId pid, long openTime, long closeTime, byte[] frameTimes) {
+    public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
         this.pid = pid;
         this.openTime = openTime;
         this.closeTime = closeTime;
-        this.frameTimes = frameTimes;
+        this.mrep = mrep;
     }
 
     public PartitionId getPartitionId() {
@@ -48,7 +49,7 @@
         return closeTime;
     }
 
-    public byte[] getFrameTimes() {
-        return frameTimes;
+    public MultiResolutionEventProfiler getSamples() {
+        return mrep;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
index 139a985..2116f61 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
-import java.io.ByteArrayInputStream;
 import java.util.Map;
 
 import org.json.JSONArray;
@@ -23,6 +22,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
 
 public class TaskProfile extends AbstractProfile {
     private static final long serialVersionUID = 1L;
@@ -64,24 +64,17 @@
                 ppObj.put("partition-id", pidObj);
                 ppObj.put("open-time", pp.getOpenTime());
                 ppObj.put("close-time", pp.getCloseTime());
-                JSONArray ftArray = new JSONArray();
-                byte[] ftb = pp.getFrameTimes();
-                ByteArrayInputStream bais = new ByteArrayInputStream(ftb);
-                long value = 0;
-                int vLen = 0;
-                long time = pp.getOpenTime();
-                for (int i = 0; i < ftb.length; ++i) {
-                    byte b = (byte) bais.read();
-                    ++vLen;
-                    value += (((long) (b & 0xef)) << ((vLen - 1) * 7));
-                    if ((b & 0x80) == 0) {
-                        time += value;
-                        ftArray.put(time);
-                        vLen = 0;
-                        value = 0;
-                    }
+                MultiResolutionEventProfiler samples = pp.getSamples();
+                ppObj.put("offset", samples.getOffset());
+                int resolution = samples.getResolution();
+                int sampleCount = samples.getCount();
+                JSONArray ftA = new JSONArray();
+                int[] ft = samples.getSamples();
+                for (int i = 0; i < sampleCount; ++i) {
+                    ftA.put(ft[i]);
                 }
-                ppObj.put("frame-times", ftArray);
+                ppObj.put("frame-times", ftA);
+                ppObj.put("resolution", resolution);
                 pspArray.put(ppObj);
             }
             json.put("partition-send-profile", pspArray);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index ea4dbd6..c7b3bd8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.nc.profiling;
 
-import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -23,10 +22,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
 import edu.uci.ics.hyracks.control.nc.Task;
 
 public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+    private static final int N_SAMPLES = 1024;
+
     private final IHyracksTaskContext ctx;
 
     private final IConnectorDescriptor cd;
@@ -52,31 +54,17 @@
 
             private long closeTime;
 
-            private long prevTime;
-
-            private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
 
             @Override
             public void open() throws HyracksDataException {
-                baos.reset();
                 openTime = System.currentTimeMillis();
-                prevTime = openTime;
                 writer.open();
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                long time = System.currentTimeMillis();
-                long diff = time - prevTime;
-                prevTime = time;
-                do {
-                    byte b = (byte) (diff & 0x7f);
-                    diff >>= 7;
-                    if (diff != 0) {
-                        b |= 0x80;
-                    }
-                    baos.write(b);
-                } while (diff != 0);
+                mrep.reportEvent();
                 writer.nextFrame(buffer);
             }
 
@@ -89,8 +77,7 @@
             public void close() throws HyracksDataException {
                 closeTime = System.currentTimeMillis();
                 ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
-                        .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, baos
-                        .toByteArray()));
+                        .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
                 writer.close();
             }
         };