Added profiling visualization. Added multi-resolution profiling
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@903 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 32a3afd..75cc245 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -62,6 +62,6 @@
return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
.substring(idIdx + 1)));
}
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index 330e3a2..8794e09 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -55,6 +55,6 @@
str = str.substring(5);
return new OperatorDescriptorId(Integer.parseInt(str));
}
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 96437bf..0fb44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -61,6 +61,6 @@
int idIdx = str.lastIndexOf(':');
return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
}
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 78abaa2..7e0b22d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -61,6 +61,6 @@
int idIdx = str.lastIndexOf(':');
return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
}
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index 3a86c42..8d244e8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -23,13 +23,13 @@
import java.util.Map;
import org.apache.wicket.markup.html.basic.Label;
-import org.apache.wicket.markup.html.panel.EmptyPanel;
import org.apache.wicket.request.mapper.parameter.PageParameters;
import org.apache.wicket.util.string.StringValue;
import org.json.JSONArray;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -41,7 +41,6 @@
private static final long serialVersionUID = 1L;
private static final int HEIGHT = 29;
- private static final int WIDTH = 9;
public JobDetailsPage(PageParameters params) throws Exception {
ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
@@ -119,10 +118,11 @@
tcAttempts[k].tasks = new TaskAttempt[taArray.length()];
for (int l = 0; l < taArray.length(); ++l) {
JSONObject taO = taArray.getJSONObject(l);
- TaskAttempt ta = new TaskAttempt(taO.getString("task-id"),
- taO.getLong("start-time"), taO.getLong("end-time"));
+ TaskAttemptId taId = TaskAttemptId.parse(taO.getString("task-attempt-id"));
+ TaskAttempt ta = new TaskAttempt(taId, taO.getLong("start-time"),
+ taO.getLong("end-time"));
tcAttempts[k].tasks[l] = ta;
- TaskId tid = TaskId.parse(taO.getString("task-id"));
+ TaskId tid = taId.getTaskId();
ta.name = activityMap.get(tid.getActivityId());
ta.partition = tid.getPartition();
}
@@ -149,6 +149,46 @@
}
}
+ Map<TaskAttemptId, TaskProfile> tpMap = new HashMap<TaskAttemptId, TaskProfile>();
+ if (jrO.has("profile")) {
+ JSONObject pO = jrO.getJSONObject("profile");
+ if (pO.has("joblets")) {
+ JSONArray jobletsA = pO.getJSONArray("joblets");
+ for (int i = 0; i < jobletsA.length(); ++i) {
+ JSONObject jobletO = jobletsA.getJSONObject(i);
+ if (jobletO.has("tasks")) {
+ JSONArray tasksA = jobletO.getJSONArray("tasks");
+ for (int j = 0; j < tasksA.length(); ++j) {
+ JSONObject taskO = tasksA.getJSONObject(j);
+ ActivityId activityId = ActivityId.parse(taskO.getString("activity-id"));
+ int partition = taskO.getInt("partition");
+ int attempt = taskO.getInt("attempt");
+ TaskAttemptId taId = new TaskAttemptId(new TaskId(activityId, partition), attempt);
+ if (taskO.has("partition-send-profile")) {
+ JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile");
+ for (int k = 0; k < taskProfilesA.length(); ++k) {
+ JSONObject ppO = taskProfilesA.getJSONObject(k);
+ long openTime = ppO.getLong("open-time");
+ long closeTime = ppO.getLong("close-time");
+ int resolution = ppO.getInt("resolution");
+ long offset = ppO.getLong("offset");
+ JSONArray frameTimesA = ppO.getJSONArray("frame-times");
+ long[] frameTimes = new long[frameTimesA.length()];
+ for (int l = 0; l < frameTimes.length; ++l) {
+ frameTimes[l] = frameTimesA.getInt(l) + offset;
+ }
+ TaskProfile tp = new TaskProfile(taId, openTime, closeTime, frameTimes, resolution);
+ if (!tpMap.containsKey(tp.taId)) {
+ tpMap.put(tp.taId, tp);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
if (!tcList.isEmpty()) {
Collections.sort(tcList, new Comparator<TaskClusterAttempt[]>() {
@Override
@@ -188,11 +228,42 @@
buffer.append("<rect x=\"").append(tStartTime * width + leftOffset).append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT / 4).append("\" width=\"")
.append(width * (tEndTime - tStartTime)).append("\" height=\"").append(HEIGHT / 2)
- .append("\"/>\n");
+ .append("\" style=\"fill:rgb(255,255,255);stroke-width:1;stroke:rgb(0,0,0)\"/>\n");
buffer.append("<text x=\"").append(tEndTime * width + leftOffset + 20).append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">")
.append((tEndTime - tStartTime) + " ms (" + ta.name + ":" + ta.partition + ")")
.append("</text>\n");
+ TaskProfile tp = tpMap.get(ta.taId);
+ if (tp != null) {
+ for (int k = 0; k < tp.frameTimes.length; ++k) {
+ long taOpenTime = tp.openTime - minTime;
+ buffer.append("<rect x=\"")
+ .append(taOpenTime * width + leftOffset)
+ .append("\" y=\"")
+ .append(y * (HEIGHT + 1) + HEIGHT / 4)
+ .append("\" width=\"1\" height=\"")
+ .append(HEIGHT / 2)
+ .append("\" style=\"fill:rgb(255,0,0);stroke-width:1;stroke:rgb(255,0,0)\"/>\n");
+ for (long ft : tp.frameTimes) {
+ long taNextTime = ft - minTime;
+ buffer.append("<rect x=\"")
+ .append(taNextTime * width + leftOffset)
+ .append("\" y=\"")
+ .append(y * (HEIGHT + 1) + HEIGHT / 4)
+ .append("\" width=\"1\" height=\"")
+ .append(HEIGHT / 2)
+ .append("\" style=\"fill:rgb(0,255,0);stroke-width:1;stroke:rgb(0,255,0)\"/>\n");
+ }
+ long taCloseTime = tp.closeTime - minTime;
+ buffer.append("<rect x=\"")
+ .append(taCloseTime * width + leftOffset)
+ .append("\" y=\"")
+ .append(y * (HEIGHT + 1) + HEIGHT / 4)
+ .append("\" width=\"1\" height=\"")
+ .append(HEIGHT / 2)
+ .append("\" style=\"fill:rgb(0,0,255);stroke-width:1;stroke:rgb(0,0,255)\"/>\n");
+ }
+ }
++y;
}
}
@@ -209,96 +280,17 @@
markup.setEscapeModelStrings(false);
add(markup);
}
-
- List<TaskProfile> taskProfiles = new ArrayList<TaskProfile>();
- if (jrO.has("profile")) {
- JSONObject pO = jrO.getJSONObject("profile");
- if (pO.has("joblets")) {
- JSONArray jobletsA = pO.getJSONArray("joblets");
- for (int i = 0; i < jobletsA.length(); ++i) {
- JSONObject jobletO = jobletsA.getJSONObject(i);
- if (jobletO.has("tasks")) {
- JSONArray tasksA = jobletO.getJSONArray("tasks");
- for (int j = 0; j < tasksA.length(); ++j) {
- JSONObject taskO = tasksA.getJSONObject(j);
- String activityId = taskO.getString("activity-id");
- int partition = taskO.getInt("partition");
- int attempt = taskO.getInt("attempt");
- if (taskO.has("partition-send-profile")) {
- JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile");
- for (int k = 0; k < taskProfilesA.length(); ++k) {
- JSONObject ppO = taskProfilesA.getJSONObject(k);
- long openTime = ppO.getLong("open-time");
- long closeTime = ppO.getLong("close-time");
- JSONArray frameTimesA = ppO.getJSONArray("frame-times");
- long[] frameTimes = new long[frameTimesA.length()];
- for (int l = 0; l < frameTimes.length; ++l) {
- frameTimes[l] = frameTimesA.getLong(l);
- }
- taskProfiles.add(new TaskProfile(activityId, partition, attempt, openTime,
- closeTime, frameTimes));
- }
- }
- }
- }
- }
- }
- }
- if (!taskProfiles.isEmpty()) {
- Collections.sort(taskProfiles, new Comparator<TaskProfile>() {
- @Override
- public int compare(TaskProfile o1, TaskProfile o2) {
- return o1.openTime < o2.openTime ? -1 : (o1.openTime > o2.openTime ? 1 : 0);
- }
- });
- long startTime = taskProfiles.get(0).openTime;
- long timeRange = taskProfiles.get(taskProfiles.size() - 1).closeTime - startTime;
- int n = taskProfiles.size();
- StringBuilder buffer = new StringBuilder();
- buffer.append("<svg viewBox=\"0 0 ").append((timeRange + 1) * (WIDTH + 1)).append(' ')
- .append((n + 1) * (HEIGHT + 1)).append("\" version=\"1.1\"\n");
- buffer.append("xmlns=\"http://www.w3.org/2000/svg\">\n");
- for (int i = 0; i < n; ++i) {
- TaskProfile tp = taskProfiles.get(i);
- open(buffer, i, tp.openTime - startTime);
- for (long ft : tp.frameTimes) {
- nextFrame(buffer, i, ft - startTime);
- }
- close(buffer, i, tp.closeTime - startTime);
- }
- buffer.append("</svg>");
- Label markup = new Label("job-timeline-profile", buffer.toString());
- markup.setEscapeModelStrings(false);
- add(markup);
- } else {
- add(new EmptyPanel("job-timeline-profile"));
- }
- }
-
- private void open(StringBuilder buffer, int i, long openTime) {
- buffer.append("<rect x=\"").append(openTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
- .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
- }
-
- private void close(StringBuilder buffer, int i, long closeTime) {
- buffer.append("<rect x=\"").append(closeTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
- .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
- }
-
- private void nextFrame(StringBuilder buffer, int i, long frameTime) {
- buffer.append("<rect x=\"").append(frameTime * (WIDTH + 1)).append("\" y=\"").append(i * (HEIGHT + 1))
- .append("\" width=\"").append(WIDTH).append("\" height=\"").append(HEIGHT).append("\"/>\n");
}
private static class TaskAttempt {
- private String taskId;
+ private TaskAttemptId taId;
private long startTime;
private long endTime;
private String name;
private int partition;
- public TaskAttempt(String taskId, long startTime, long endTime) {
- this.taskId = taskId;
+ public TaskAttempt(TaskAttemptId taId, long startTime, long endTime) {
+ this.taId = taId;
this.startTime = startTime;
this.endTime = endTime;
}
@@ -320,21 +312,18 @@
}
private static class TaskProfile {
- private String activityId;
- private int partition;
- private int attempt;
+ private TaskAttemptId taId;
private long openTime;
private long closeTime;
private long[] frameTimes;
+ private int resolution;
- public TaskProfile(String activityId, int partition, int attempt, long openTime, long closeTime,
- long[] frameTimes) {
- this.activityId = activityId;
- this.partition = partition;
- this.activityId = activityId;
+ public TaskProfile(TaskAttemptId taId, long openTime, long closeTime, long[] frameTimes, int resolution) {
+ this.taId = taId;
this.openTime = openTime;
this.closeTime = closeTime;
this.frameTimes = frameTimes;
+ this.resolution = resolution;
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
index 8c5578d..d2068fd 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.html
@@ -6,5 +6,4 @@
<div id="job-run" wicket:id="job-run" style="display: none;">
</div>
<div wicket:id="job-timeline" style="overflow: auto;"></div>
- <div wicket:id="job-timeline-profile"></div>
</wicket:extend>
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int[] times;
+
+ private long offset;
+
+ private int ptr;
+
+ private int resolution;
+
+ private int eventCounter;
+
+ public MultiResolutionEventProfiler(int nSamples) {
+ times = new int[nSamples];
+ offset = -1;
+ ptr = 0;
+ resolution = 1;
+ eventCounter = 0;
+ }
+
+ public void reportEvent() {
+ ++eventCounter;
+ if (eventCounter % resolution != 0) {
+ return;
+ }
+ if (ptr >= times.length) {
+ compact();
+ return;
+ }
+ eventCounter = 0;
+ long time = System.currentTimeMillis();
+ if (offset < 0) {
+ offset = time;
+ }
+ int value = (int) (time - offset);
+ times[ptr++] = value;
+ }
+
+ private void compact() {
+ for (int i = 1; i < ptr / 2; ++i) {
+ times[i] = times[i * 2];
+ }
+ resolution <<= 1;
+ ptr >>= 1;
+ }
+
+ public int getResolution() {
+ return resolution;
+ }
+
+ public int getCount() {
+ return ptr;
+ }
+
+ public int[] getSamples() {
+ return times;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
index f6568d9..ef61796 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -17,6 +17,7 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
public class PartitionProfile implements Serializable {
private static final long serialVersionUID = 1L;
@@ -27,13 +28,13 @@
private final long closeTime;
- private final byte[] frameTimes;
+ private final MultiResolutionEventProfiler mrep;
- public PartitionProfile(PartitionId pid, long openTime, long closeTime, byte[] frameTimes) {
+ public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
this.pid = pid;
this.openTime = openTime;
this.closeTime = closeTime;
- this.frameTimes = frameTimes;
+ this.mrep = mrep;
}
public PartitionId getPartitionId() {
@@ -48,7 +49,7 @@
return closeTime;
}
- public byte[] getFrameTimes() {
- return frameTimes;
+ public MultiResolutionEventProfiler getSamples() {
+ return mrep;
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
index 139a985..2116f61 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
-import java.io.ByteArrayInputStream;
import java.util.Map;
import org.json.JSONArray;
@@ -23,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
@@ -64,24 +64,17 @@
ppObj.put("partition-id", pidObj);
ppObj.put("open-time", pp.getOpenTime());
ppObj.put("close-time", pp.getCloseTime());
- JSONArray ftArray = new JSONArray();
- byte[] ftb = pp.getFrameTimes();
- ByteArrayInputStream bais = new ByteArrayInputStream(ftb);
- long value = 0;
- int vLen = 0;
- long time = pp.getOpenTime();
- for (int i = 0; i < ftb.length; ++i) {
- byte b = (byte) bais.read();
- ++vLen;
- value += (((long) (b & 0xef)) << ((vLen - 1) * 7));
- if ((b & 0x80) == 0) {
- time += value;
- ftArray.put(time);
- vLen = 0;
- value = 0;
- }
+ MultiResolutionEventProfiler samples = pp.getSamples();
+ ppObj.put("offset", samples.getOffset());
+ int resolution = samples.getResolution();
+ int sampleCount = samples.getCount();
+ JSONArray ftA = new JSONArray();
+ int[] ft = samples.getSamples();
+ for (int i = 0; i < sampleCount; ++i) {
+ ftA.put(ft[i]);
}
- ppObj.put("frame-times", ftArray);
+ ppObj.put("frame-times", ftA);
+ ppObj.put("resolution", resolution);
pspArray.put(ppObj);
}
json.put("partition-send-profile", pspArray);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index ea4dbd6..c7b3bd8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.nc.profiling;
-import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -23,10 +22,13 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.nc.Task;
public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+ private static final int N_SAMPLES = 1024;
+
private final IHyracksTaskContext ctx;
private final IConnectorDescriptor cd;
@@ -52,31 +54,17 @@
private long closeTime;
- private long prevTime;
-
- private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
@Override
public void open() throws HyracksDataException {
- baos.reset();
openTime = System.currentTimeMillis();
- prevTime = openTime;
writer.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- long time = System.currentTimeMillis();
- long diff = time - prevTime;
- prevTime = time;
- do {
- byte b = (byte) (diff & 0x7f);
- diff >>= 7;
- if (diff != 0) {
- b |= 0x80;
- }
- baos.write(b);
- } while (diff != 0);
+ mrep.reportEvent();
writer.nextFrame(buffer);
}
@@ -89,8 +77,7 @@
public void close() throws HyracksDataException {
closeTime = System.currentTimeMillis();
((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
- .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, baos
- .toByteArray()));
+ .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
writer.close();
}
};