add log to watch the tuple size
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 0d51035..ff9da9c 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,6 +4,10 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -22,6 +26,7 @@
     private static final long serialVersionUID = 1L;
 
     private final int ValidPosCount;
+    private static final Log LOG = LogFactory.getLog(Driver.class);
 
     public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
         ValidPosCount = getPositionCount(readLength, kmerLength);
@@ -45,6 +50,7 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
         return new IAggregatorDescriptor() {
 
             class PositionArray {
@@ -168,6 +174,7 @@
                 }
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
                 try {
+                    int totalSize = 0;
                     for (int i = 0; i < ValidPosCount; i++) {
                         fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
                                 positionArray.forwardStorages[i].getStartOffset(),
@@ -178,6 +185,16 @@
                                 positionArray.reverseStorages[i].getStartOffset(),
                                 positionArray.reverseStorages[i].getLength());
                         tupleBuilder.addFieldEndOffset();
+
+                        totalSize += positionArray.forwardStorages[i].getLength()
+                                + positionArray.reverseStorages[i].getLength();
+                    }
+                    if (totalSize > frameSize / 2) {
+                        int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+                        int readID = accessor.getBuffer().getInt(
+                                leadbyte + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+                        LOG.warn("MergeReadID on read:" + readID + " is of size: " + totalSize + ", current frameSize:"
+                                + frameSize + "\n Recommendate to enlarge the FrameSize");
                     }
                 } catch (IOException e) {
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");