Fix Upsert Pipeline

Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1308
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
index 304eb0c..64cca8c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -43,11 +43,6 @@
         this.fta = fta;
     }
 
-    public void prettyPrint(ByteBuffer frame) {
-        fta.reset(frame);
-        fta.prettyPrint();
-    }
-
     @Override
     public ByteBuffer handle(HyracksDataException th, ByteBuffer frame) {
         try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index e05aa25..f6eeb66 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -26,10 +26,10 @@
 import java.util.Map;
 
 import org.apache.asterix.common.config.AsterixStorageProperties;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
@@ -42,8 +42,8 @@
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -1097,7 +1097,7 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
             List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification spec) throws AlgebricksException {
@@ -1119,22 +1119,22 @@
         int i = 0;
         // set the keys' permutations
         for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
+            int idx = inputSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
             bloomFilterKeyFields[i] = i;
             i++;
         }
         // set the record permutation
-        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+        fieldPermutation[i++] = inputSchema.findVariable(payload);
         // set the filters' permutations.
         if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(filterKeys.get(0));
+            int idx = inputSchema.findVariable(filterKeys.get(0));
             fieldPermutation[i++] = idx;
         }
 
         if (additionalNonFilterFields != null) {
             for (LogicalVariable var : additionalNonFilterFields) {
-                int idx = propagatedSchema.findVariable(var);
+                int idx = inputSchema.findVariable(var);
                 fieldPermutation[i++] = idx;
             }
         }
@@ -1195,22 +1195,19 @@
                     + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
             ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
                     + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
-                outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
-                outputSerDes[j] = recordDesc.getFields()[j];
-            }
-            outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
-                    .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
-                    .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
 
+            // add the previous record first
+            int f = 0;
+            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            f++;
+            // add the previous meta second
             if (dataset.hasMetaPart()) {
-                outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
-                        .getSerdeProvider().getSerializerDeserializer(metaItemType);
-                outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
-                        .getTypeTraitProvider().getTypeTrait(metaItemType);
+                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(
+                        metaItemType);
+                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                f++;
             }
-
+            // add the previous filter third
             int fieldIdx = -1;
             if (numFilterFields > 0) {
                 String filterField = DatasetUtils.getFilterField(dataset).get(0);
@@ -1220,10 +1217,15 @@
                     }
                 }
                 fieldIdx = i;
-                outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-                outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType
+                        .getFieldTypes()[fieldIdx]);
+                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
                         .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                f++;
+            }
+            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
+                outputSerDes[j + f] = recordDesc.getFields()[j];
             }
 
             RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index afd6019..96f9e76 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -78,6 +78,8 @@
     private ARecordPointable recPointable;
     private DataOutput prevDos;
     private final boolean hasMeta;
+    private final int filterFieldIndex;
+    private final int metaFieldIndex;
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -93,6 +95,8 @@
         key.setFieldPermutation(searchKeyPermutations);
         hasMeta = (fieldPermutation.length > numOfPrimaryKeys + 1) && (filterFieldIndex < 0
                 || (filterFieldIndex >= 0 && (fieldPermutation.length > numOfPrimaryKeys + 2)));
+        this.metaFieldIndex = numOfPrimaryKeys + 1;
+        this.filterFieldIndex = numOfPrimaryKeys + (hasMeta ? 2 : 1);
         if (filterFieldIndex >= 0) {
             isFiltered = true;
             this.recordType = recordType;
@@ -101,7 +105,6 @@
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -141,8 +144,8 @@
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAsterixAppRuntimeContext runtimeCtx =
-                    (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                    .getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
@@ -156,41 +159,12 @@
         searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    private void writeOutput(int tupleIndex, boolean recordWasInserted) throws IOException {
-        boolean recordWasDeleted = prevTuple != null;
-        tb.reset();
+    private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
         frameTuple.reset(accessor, tupleIndex);
         for (int i = 0; i < frameTuple.getFieldCount(); i++) {
             dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
             tb.addFieldEndOffset();
         }
-        if (recordWasDeleted) {
-            dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
-                    prevTuple.getFieldLength(numOfPrimaryKeys));
-            tb.addFieldEndOffset();
-            // if has meta, then append meta
-            if (hasMeta) {
-                dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
-                        prevTuple.getFieldLength(numOfPrimaryKeys + 1));
-                tb.addFieldEndOffset();
-            }
-            // if with filters, append the filter
-            if (isFiltered) {
-                dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
-                        prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
-                        prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ? 2 : 1)));
-                tb.addFieldEndOffset();
-            }
-        } else {
-            addNullField();
-            if (hasMeta) {
-                addNullField();
-            }
-            // if with filters, append null
-            if (isFiltered) {
-                addNullField();
-            }
-        }
         if (recordWasInserted || recordWasDeleted) {
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         }
@@ -214,6 +188,7 @@
         int i = 0;
         try {
             while (i < tupleCount) {
+                tb.reset();
                 boolean recordWasInserted = false;
                 tuple.reset(accessor, i);
                 resetSearchPredicate(i);
@@ -222,10 +197,26 @@
                     cursor.next();
                     prevTuple = cursor.getTuple();
                     cursor.reset();
-                    modCallback.setOp(Operation.DELETE);
                     if (isFiltered) {
                         prevTuple = getPrevTupleWithFilter(prevTuple);
                     }
+                    dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+                            prevTuple.getFieldLength(numOfPrimaryKeys));
+                    tb.addFieldEndOffset();
+                    // if has meta, then append meta
+                    if (hasMeta) {
+                        dos.write(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex),
+                                prevTuple.getFieldLength(metaFieldIndex));
+                        tb.addFieldEndOffset();
+                    }
+                    // if with filters, append the filter
+                    if (isFiltered) {
+                        dos.write(prevTuple.getFieldData(filterFieldIndex),
+                                prevTuple.getFieldStart(filterFieldIndex),
+                                prevTuple.getFieldLength(filterFieldIndex));
+                        tb.addFieldEndOffset();
+                    }
+                    modCallback.setOp(Operation.DELETE);
                     if (i == 0) {
                         lsmAccessor.delete(prevTuple);
                     } else {
@@ -233,6 +224,14 @@
                     }
                 } else {
                     prevTuple = null;
+                    addNullField();
+                    if (hasMeta) {
+                        addNullField();
+                    }
+                    // if with filters, append null
+                    if (isFiltered) {
+                        addNullField();
+                    }
                     cursor.reset();
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
@@ -244,7 +243,7 @@
                     }
                     recordWasInserted = true;
                 }
-                writeOutput(i, recordWasInserted);
+                writeOutput(i, recordWasInserted, prevTuple != null);
                 i++;
             }
             appender.write(writer, true);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 68710a5..6947942 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -193,7 +193,7 @@
     public IFunctionInfo lookupFunction(FunctionIdentifier fid);
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
-            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
             List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification jobSpec) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 5dc327a..9838c12 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -84,7 +84,6 @@
     @Override
     public void recomputeSchema() throws AlgebricksException {
         schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
         if (operation == Kind.UPSERT) {
             // The upsert case also produces the previous record
             schema.add(prevRecordVar);
@@ -95,6 +94,7 @@
                 schema.add(prevFilterVar);
             }
         }
+        schema.addAll(inputs.get(0).getValue().getSchema());
     }
 
     public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
@@ -146,7 +146,6 @@
             @Override
             public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
                     throws AlgebricksException {
-                target.addAllVariables(sources[0]);
                 if (operation == Kind.UPSERT) {
                     target.addVariable(prevRecordVar);
                     if (prevAdditionalNonFilteringVars != null) {
@@ -158,6 +157,7 @@
                         target.addVariable(prevFilterVar);
                     }
                 }
+                target.addAllVariables(sources[0]);
             }
         };
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 28f4e5e..59ccd84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -34,11 +34,11 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -295,7 +295,12 @@
 
     @Override
     public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
-        standardLayout(op);
+        // produced first
+        VariableUtilities.getProducedVariables(op, schemaVariables);
+        // then propagated
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            VariableUtilities.getLiveVariables(c.getValue(), schemaVariables);
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 3c9cddf..6ded4a3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -114,7 +114,7 @@
             runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
                     additionalFilteringKeys, inputDesc, context, spec);
         } else if (operation == Kind.UPSERT) {
-            runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+            runtimeAndConstraints = mp.getUpsertRuntime(dataSource, inputSchemas[0], typeEnv, keys, payload,
                     additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec);
         } else {
             throw new AlgebricksException("Unsupported Operation " + operation);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index d99e2f2..cefada7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -18,18 +18,12 @@
  */
 package org.apache.hyracks.dataflow.common.comm.io;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.FrameConstants;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 /**
@@ -118,132 +112,8 @@
         return getFieldCount() * FrameConstants.SIZE_LEN;
     }
 
-    public void prettyPrint(String prefix) {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        int tc = getTupleCount();
-        StringBuilder sb = new StringBuilder();
-        sb.append(prefix).append("TC: " + tc).append("\n");
-        for (int i = 0; i < tc; ++i) {
-            prettyPrint(i, bbis, dis, sb);
-        }
-        System.err.println(sb.toString());
-    }
-
-    public void prettyPrint() {
-        prettyPrint("");
-    }
-
-    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
-        sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
-        for (int j = 0; j < getFieldCount(); ++j) {
-            sb.append(" ");
-            if (j > 0) {
-                sb.append("|");
-            }
-            sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
-            sb.append("{");
-            bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
-            try {
-                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
-            } catch (Exception e) {
-                e.printStackTrace();
-                sb.append("Failed to deserialize field" + j);
-            }
-            sb.append("}");
-        }
-        sb.append("\n");
-    }
-
-    public void prettyPrint(int tid) {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        StringBuilder sb = new StringBuilder();
-        prettyPrint(tid, bbis, dis, sb);
-        System.err.println(sb.toString());
-    }
-
     @Override
     public int getFieldCount() {
         return recordDescriptor.getFieldCount();
     }
-
-    /*
-     * The two methods below can be used for debugging.
-     * They are safe as they don't print records. Printing records
-     * using IserializerDeserializer can print incorrect results or throw exceptions.
-     * A better way yet would be to use record pointable.
-     */
-    public void prettyPrint(String prefix, int[] recordFields) throws IOException {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        int tc = getTupleCount();
-        StringBuilder sb = new StringBuilder();
-        sb.append(prefix).append("TC: " + tc).append("\n");
-        for (int i = 0; i < tc; ++i) {
-            prettyPrint(i, bbis, dis, sb, recordFields);
-        }
-        System.err.println(sb.toString());
-    }
-
-    public void prettyPrint(int tIdx, int[] recordFields) throws IOException {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        StringBuilder sb = new StringBuilder();
-        prettyPrint(tIdx, bbis, dis, sb, recordFields);
-        System.err.println(sb.toString());
-    }
-
-    public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) throws HyracksDataException {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        StringBuilder sb = new StringBuilder();
-        sb.append("[");
-        sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", "
-                + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") ");
-        sb.append("{");
-        ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx));
-        bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx));
-        sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis));
-        sb.append("}");
-        sb.append("\n");
-        System.err.println(sb.toString());
-    }
-
-    public void prettyPrint(ITupleReference tuple, int[] descF) throws HyracksDataException {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        StringBuilder sb = new StringBuilder();
-        sb.append("[");
-        for (int j = 0; j < descF.length; ++j) {
-            sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", "
-                    + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") ");
-            sb.append("{");
-            ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j));
-            bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j));
-            sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis));
-            sb.append("}");
-        }
-        sb.append("\n");
-        System.err.println(sb.toString());
-    }
-
-    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
-            int[] recordFields) throws IOException {
-        Arrays.sort(recordFields);
-        sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
-        for (int j = 0; j < getFieldCount(); ++j) {
-            sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
-            sb.append("{");
-            bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
-            if (Arrays.binarySearch(recordFields, j) >= 0) {
-                sb.append("{a record field: only print using pointable:");
-                sb.append("tag->" + dis.readByte() + "}");
-            } else {
-                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
-            }
-            sb.append("}");
-        }
-        sb.append("\n");
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
index d05b3ed..874ac46 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
@@ -18,10 +18,6 @@
  */
 package org.apache.hyracks.dataflow.common.comm.io;
 
-import java.io.DataInputStream;
-
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
 public class ResultFrameTupleAccessor extends FrameTupleAccessor {
 
     public ResultFrameTupleAccessor() {
@@ -29,16 +25,6 @@
     }
 
     @Override
-    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
-        sb.append(tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
-
-        bbis.setByteBuffer(getBuffer(), getTupleStartOffset(tid));
-        sb.append(dis);
-
-        sb.append("]\n");
-    }
-
-    @Override
     public int getFieldCount() {
         return 1;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java
new file mode 100644
index 0000000..aa27c42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * A Util class used for inspecting frames
+ * for debugging purposes
+ */
+public class FrameDebugUtils {
+    private FrameDebugUtils() {
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     * @param prefix
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix) {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            int tc = fta.getTupleCount();
+            StringBuilder sb = new StringBuilder();
+            sb.append(prefix).append("TC: " + tc).append("\n");
+            for (int i = 0; i < tc; ++i) {
+                prettyPrint(fta, recordDescriptor, i, bbis, dis, sb);
+            }
+            System.err.println(sb.toString());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) {
+        prettyPrint(fta, recordDescriptor, "");
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param operator
+     */
+    public void prettyPrintTags(IFrameTupleAccessor fta, String operator) {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            int tc = fta.getTupleCount();
+            StringBuilder sb = new StringBuilder();
+            sb.append(operator + ":");
+            sb.append("TC: " + tc).append("\n");
+            for (int i = 0; i < tc; ++i) {
+                prettyPrintTag(fta, i, bbis, dis, sb);
+            }
+            System.err.println(sb.toString());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param tid
+     * @param bbis
+     * @param dis
+     * @param sb
+     */
+    protected void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream bbis, DataInputStream dis,
+            StringBuilder sb) {
+        sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < fta.getFieldCount(); ++j) {
+            sb.append(" ");
+            if (j > 0) {
+                sb.append("|");
+            }
+            sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            sb.append(Byte.toString(fta.getBuffer().array()[fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength()
+                    + fta.getFieldStartOffset(tid, j)]));
+            sb.append("}");
+        }
+        sb.append("\n");
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     * @param tid
+     * @param bbis
+     * @param dis
+     * @param sb
+     */
+    protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid,
+            ByteBufferInputStream bbis, DataInputStream dis,
+            StringBuilder sb) {
+        sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < fta.getFieldCount(); ++j) {
+            sb.append(" ");
+            if (j > 0) {
+                sb.append("|");
+            }
+            sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta
+                    .getFieldStartOffset(tid, j));
+            try {
+                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+            } catch (Exception e) {
+                e.printStackTrace();
+                sb.append("Failed to deserialize field" + j);
+            }
+            sb.append("}");
+        }
+        sb.append("\n");
+    }
+
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     * @param tid
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid) {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            StringBuilder sb = new StringBuilder();
+            prettyPrint(fta, recordDescriptor, tid, bbis, dis, sb);
+            System.err.println(sb.toString());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Debugging method
+     * They are safe as they don't print records. Printing records
+     * using IserializerDeserializer can print incorrect results or throw exceptions.
+     * A better way yet would be to use record pointable.
+     * @param fta
+     * @param recordDescriptor
+     * @param prefix
+     * @param recordFields
+     * @throws IOException
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix,
+            int[] recordFields) throws IOException {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            int tc = fta.getTupleCount();
+            StringBuilder sb = new StringBuilder();
+            sb.append(prefix).append("TC: " + tc).append("\n");
+            for (int i = 0; i < tc; ++i) {
+                prettyPrint(fta, recordDescriptor, i, bbis, dis, sb, recordFields);
+            }
+            System.err.println(sb.toString());
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     * @param tIdx
+     * @param recordFields
+     * @throws IOException
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tIdx, int[] recordFields)
+            throws IOException {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            StringBuilder sb = new StringBuilder();
+            prettyPrint(fta, recordDescriptor, tIdx, bbis, dis, sb, recordFields);
+            System.err.println(sb.toString());
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param tuple
+     * @param fieldsIdx
+     * @param descIdx
+     * @throws HyracksDataException
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple,
+            int fieldsIdx, int descIdx)
+            throws HyracksDataException {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[");
+            sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", "
+                    + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") ");
+            sb.append("{");
+            ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx));
+            bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx));
+            sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis));
+            sb.append("}");
+            sb.append("\n");
+            System.err.println(sb.toString());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param tuple
+     * @param descF
+     * @throws HyracksDataException
+     */
+    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple,
+            int[] descF) throws HyracksDataException {
+        try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream dis = new DataInputStream(bbis)) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[");
+            for (int j = 0; j < descF.length; ++j) {
+                sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", "
+                        + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") ");
+                sb.append("{");
+                ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j));
+                bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j));
+                sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis));
+                sb.append("}");
+            }
+            sb.append("\n");
+            System.err.println(sb.toString());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Debugging method
+     * @param fta
+     * @param recordDescriptor
+     * @param tid
+     * @param bbis
+     * @param dis
+     * @param sb
+     * @param recordFields
+     * @throws IOException
+     */
+    protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid,
+            ByteBufferInputStream bbis, DataInputStream dis,
+            StringBuilder sb,
+            int[] recordFields) throws IOException {
+        Arrays.sort(recordFields);
+        sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < fta.getFieldCount(); ++j) {
+            sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta
+                    .getFieldSlotsLength() + fta.getFieldStartOffset(tid, j));
+            if (Arrays.binarySearch(recordFields, j) >= 0) {
+                sb.append("{a record field: only print using pointable:");
+                sb.append("tag->" + dis.readByte() + "}");
+            } else {
+                sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+            }
+            sb.append("}");
+        }
+        sb.append("\n");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index 567b7df..c68d59d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -31,7 +31,6 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.DataInputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -40,8 +39,6 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import junit.extensions.PA;
-
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
@@ -62,6 +59,8 @@
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 import org.junit.Test;
 
+import junit.extensions.PA;
+
 public class RunMergingFrameReaderTest {
     static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(),
             ComparatorFactories[1].createBinaryComparator(), };
@@ -125,12 +124,6 @@
             return true;
         }
 
-        private void printFrame(ByteBuffer buffer) {
-            FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
-            fta.reset(buffer);
-            fta.prettyPrint();
-        }
-
         @Override
         public void close() throws HyracksDataException {
         }