add log to watch the tuple size
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 0d51035..ff9da9c 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,6 +4,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -22,6 +26,7 @@
private static final long serialVersionUID = 1L;
private final int ValidPosCount;
+ private static final Log LOG = LogFactory.getLog(Driver.class);
public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
ValidPosCount = getPositionCount(readLength, kmerLength);
@@ -45,6 +50,7 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
class PositionArray {
@@ -168,6 +174,7 @@
}
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
+ int totalSize = 0;
for (int i = 0; i < ValidPosCount; i++) {
fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
positionArray.forwardStorages[i].getStartOffset(),
@@ -178,6 +185,16 @@
positionArray.reverseStorages[i].getStartOffset(),
positionArray.reverseStorages[i].getLength());
tupleBuilder.addFieldEndOffset();
+
+ totalSize += positionArray.forwardStorages[i].getLength()
+ + positionArray.reverseStorages[i].getLength();
+ }
+ if (totalSize > frameSize / 2) {
+ int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ int readID = accessor.getBuffer().getInt(
+ leadbyte + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+ LOG.warn("MergeReadID on read:" + readID + " is of size: " + totalSize + ", current frameSize:"
+ + frameSize + "\n Recommendate to enlarge the FrameSize");
}
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");