[NO ISSUE] Add performance traces for frame writes
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Add traces for writes to end of local pipeline.
Change-Id: Ib32f4122fdddff1d0dce282a99829e0e0ad820e9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2047
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 4bb2869..ef3f13b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -128,6 +128,7 @@
public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
public static final int CANNOT_READ_CLOSED_FILE = 94;
+ public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ede2f86..40df3d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -111,5 +111,6 @@
92 = Job %1$s has been cleared from job history
93 = Job %1$s has not been created yet
94 = Cannot read closed file (%1$s)
+95 = Tuple of size %1$s cannot fit into an empty frame
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index efdd963..dfb03ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.IntSerDeUtils;
+import org.apache.hyracks.util.trace.Tracer;
/*
* Frame
@@ -115,4 +116,19 @@
}
writer.flush();
}
+
+ public void flush(IFrameWriter writer, Tracer tracer, String name, String cat, String args)
+ throws HyracksDataException {
+ long tid = -1L;
+ if (tracer != null && tracer.isEnabled()) {
+ tid = tracer.durationB(name, cat, args);
+ }
+ if (tupleCount > 0) {
+ write(writer, true);
+ }
+ writer.flush();
+ if (tracer != null && tracer.isEnabled()) {
+ tracer.durationE(tid, args);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
index 063afe7..8411ff7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -24,10 +24,15 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.trace.Tracer;
public class FrameUtils {
+ private FrameUtils() {
+ }
+
public static void copyWholeFrame(ByteBuffer srcFrame, ByteBuffer destFrame) {
srcFrame.clear();
destFrame.clear();
@@ -67,7 +72,7 @@
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, length);
}
}
return flushedBytes;
@@ -93,7 +98,7 @@
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.append(bytes, offset, length)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, length);
}
}
return flushedBytes;
@@ -109,14 +114,14 @@
* @throws HyracksDataException
*/
public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
- IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
- throws HyracksDataException {
+ IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException {
int flushedBytes = 0;
if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
+ tEndOffset - tStartOffset);
}
}
return flushedBytes;
@@ -137,7 +142,25 @@
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
+ tupleAccessor.getTupleLength(tIndex));
+ }
+ }
+ return flushedBytes;
+ }
+
+ public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+ IFrameTupleAccessor tupleAccessor, int tIndex, Tracer tracer, String name, String cat, String args)
+ throws HyracksDataException {
+ int flushedBytes = 0;
+ if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+ flushedBytes = frameTupleAppender.getBuffer().capacity();
+ long tid = tracer.durationB(name, cat, args);
+ frameTupleAppender.write(writer, true);
+ tracer.durationE(tid, args);
+ if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
+ tupleAccessor.getTupleLength(tIndex));
}
}
return flushedBytes;
@@ -153,8 +176,8 @@
* @return the number of bytes that have been flushed, 0 if not get flushed.
* @throws HyracksDataException
*/
- public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender tupleAppender,
- int[] fieldEndOffsets, byte[] byteArray, int start, int size) throws HyracksDataException {
+ public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender tupleAppender, int[] fieldEndOffsets,
+ byte[] byteArray, int start, int size) throws HyracksDataException {
int flushedBytes = 0;
if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
@@ -162,7 +185,7 @@
tupleAppender.write(writer, true);
if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, size);
}
}
return flushedBytes;
@@ -186,7 +209,8 @@
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
+ accessor0.getTupleLength(tIndex0) + accessor1.getTupleLength(tIndex1));
}
}
return flushedBytes;
@@ -205,14 +229,19 @@
* @throws HyracksDataException
*/
public static int appendConcatToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
- IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
- int dataLen1) throws HyracksDataException {
+ IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1, int dataLen1)
+ throws HyracksDataException {
int flushedBytes = 0;
if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
+ int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
+ int length0 = endOffset0 - startOffset0;
+ int slotsLen1 = fieldSlots1.length * Integer.BYTES;
+ int length1 = slotsLen1 + dataLen1;
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, length0 + length1);
}
}
return flushedBytes;
@@ -234,7 +263,13 @@
flushedBytes = frameTupleAppender.getBuffer().capacity();
frameTupleAppender.write(writer, true);
if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
+ int fTargetSlotsLength = fields.length * Integer.BYTES;
+ int length = fTargetSlotsLength;
+ for (int i = 0; i < fields.length; ++i) {
+ length += (accessor.getFieldEndOffset(tIndex, fields[i])
+ - accessor.getFieldStartOffset(tIndex, fields[i]));
+ }
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, length);
}
}
return flushedBytes;
@@ -249,14 +284,14 @@
* @return the number of bytes that have been flushed, 0 if not get flushed.
* @throws HyracksDataException
*/
- public static int appendFieldToWriter(IFrameWriter writer, IFrameFieldAppender appender, byte[] array,
- int start, int length) throws HyracksDataException {
+ public static int appendFieldToWriter(IFrameWriter writer, IFrameFieldAppender appender, byte[] array, int start,
+ int length) throws HyracksDataException {
int flushedBytes = 0;
if (!appender.appendField(array, start, length)) {
flushedBytes = appender.getBuffer().capacity();
appender.write(writer, true);
if (!appender.appendField(array, start, length)) {
- throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, length);
}
}
return flushedBytes;
@@ -278,10 +313,11 @@
flushedBytes = appender.getBuffer().capacity();
appender.write(writer, true);
if (!appender.appendField(accessor, tid, fid)) {
- throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
+ int fStartOffset = accessor.getFieldStartOffset(tid, fid);
+ int fLen = accessor.getFieldEndOffset(tid, fid) - fStartOffset;
+ throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME, fLen);
}
}
return flushedBytes;
}
-
}