[ASTERIXDB-2688][HYR] Fix use of a Hyracks task across join stages

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
In hash join, a task from the build stage is being used in the probe stage.
This is a problem since such tasks have already finished and notified the CC they are done.
One observed issue is related to issuing a warning in the probe phase where some warnings are
not reported because they are issued to tasks that have finished (the way this happened is that
a comparator was created in the build phase using the build-phase task. Then, this comparator
was used in the probe phase and issued a warning).

- make IHyracksJobletContext extend IHyracksCommonContext so that it is also a frame manager context
- make activites of join operators use the joblet context instead of the task context for acquiring buffers
- create the probe-to-build comparator in the probe phase so that the right task is used in the comparator

Change-Id: I38a4a779b9620494f15606162f0f1e9487fd0984
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4563
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 7a59926..41fed25 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
-public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry, IHyracksCommonContext {
     INCServiceContext getServiceContext();
 
     JobId getJobId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 35cf57f..02cd184 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -240,11 +240,13 @@
         nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
     }
 
-    ByteBuffer allocateFrame() throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame() throws HyracksDataException {
         return frameManager.allocateFrame();
     }
 
-    ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
         if (serviceCtx.getMemoryManager().allocate(bytes)) {
             memoryAllocation.addAndGet(bytes);
             return frameManager.allocateFrame(bytes);
@@ -252,18 +254,21 @@
         throw new HyracksDataException("Unable to allocate frame: Not enough memory");
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
+    @Override
+    public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
             throws HyracksDataException {
         return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData);
     }
 
-    void deallocateFrames(int bytes) {
+    @Override
+    public void deallocateFrames(int bytes) {
         memoryAllocation.addAndGet(bytes);
         serviceCtx.getMemoryManager().deallocate(bytes);
         frameManager.deallocateFrames(bytes);
     }
 
-    public final int getFrameSize() {
+    @Override
+    public final int getInitialFrameSize() {
         return frameManager.getInitialFrameSize();
     }
 
@@ -271,7 +276,8 @@
         return maxWarnings;
     }
 
-    public IIOManager getIOManager() {
+    @Override
+    public IIOManager getIoManager() {
         return serviceCtx.getIoManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e58e4a4..158e24e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -139,7 +139,7 @@
         this.taskAttemptId = taskId;
         this.displayName = displayName;
         this.executorService = executor;
-        fileFactory = new WorkspaceFileFactory(this, joblet.getIOManager());
+        fileFactory = new WorkspaceFileFactory(this, joblet.getIoManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<>();
         opEnv = joblet.getEnvironment();
@@ -181,12 +181,12 @@
 
     @Override
     public int getInitialFrameSize() {
-        return joblet.getFrameSize();
+        return joblet.getInitialFrameSize();
     }
 
     @Override
     public IIOManager getIoManager() {
-        return joblet.getIOManager();
+        return joblet.getIoManager();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index eadbcf7..fb2d4e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -49,35 +49,35 @@
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
-    private IFrameTupleAccessor accessorProbe;
+    private final IFrameTupleAccessor accessorProbe;
     private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
-    private final ITuplePairComparator tpComparator;
+    private ITuplePairComparator tpComparator;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuild;
     private final ISerializableTable table;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
-    private TupleInFrameListAccessor tupleAccessor;
+    private final TupleInFrameListAccessor tupleAccessor;
     // To release frames
-    ISimpleFrameBufferManager bufferManager;
+    private final ISimpleFrameBufferManager bufferManager;
     private final boolean isTableCapacityNotZero;
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
-            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
-            ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+    public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
             throws HyracksDataException {
-        this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
-                missingWritersBuild, table, predEval, false, bufferManager);
+        this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
+                predEval, false, bufferManager);
     }
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
-            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
-            ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+    public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
             ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
         this.table = table;
@@ -88,7 +88,6 @@
         this.accessorProbe = accessorProbe;
         this.tpcProbe = tpcProbe;
         appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        tpComparator = comparator;
         predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
@@ -105,11 +104,7 @@
         reverseOutputOrder = reverse;
         this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
         this.bufferManager = bufferManager;
-        if (table.getTableSize() != 0) {
-            isTableCapacityNotZero = true;
-        } else {
-            isTableCapacityNotZero = false;
-        }
+        this.isTableCapacityNotZero = table.getTableSize() != 0;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("InMemoryHashJoin has been created for a table size of " + table.getTableSize()
                     + " for Thread ID " + Thread.currentThread().getId() + ".");
@@ -126,6 +121,7 @@
             storedTuplePointer.reset(bIndex, i);
             // If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
             if (!table.insert(entry, storedTuplePointer)) {
+                // TODO(ali): should check if insertion failed even after compaction and take action
                 compactTableAndInsertAgain(entry, storedTuplePointer);
             }
         }
@@ -152,6 +148,15 @@
     }
 
     /**
+     * Must be called before starting to join to set the right comparator with the right context.
+     *
+     * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+     */
+    void setComparator(ITuplePairComparator comparator) {
+        tpComparator = comparator;
+    }
+
+    /**
      * Reads the given tuple from the probe side and joins it with tuples from the build side.
      * This method assumes that the accessorProbe is already set to the current probe frame.
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ccca62d..33976a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -34,7 +35,6 @@
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -74,18 +74,8 @@
             IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
             IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
-        super(spec, 2, 1);
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories0 = hashFunctionFactories0;
-        this.hashFunctionFactories1 = hashFunctionFactories1;
-        this.comparatorFactory = comparatorFactory;
-        this.predEvaluatorFactory = predEvalFactory;
-        outRecDescs[0] = recordDescriptor;
-        this.isLeftOuter = false;
-        this.nonMatchWriterFactories = null;
-        this.tableSize = tableSize;
-        this.memSizeInFrames = memSizeInFrames;
+        this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
+                recordDescriptor, false, null, tableSize, memSizeInFrames);
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
@@ -100,7 +90,7 @@
         this.hashFunctionFactories1 = hashFunctionFactories1;
         this.comparatorFactory = comparatorFactory;
         this.predEvaluatorFactory = predEvalFactory;
-        outRecDescs[0] = recordDescriptor;
+        this.outRecDescs[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = missingWriterFactories1;
         this.tableSize = tableSize;
@@ -125,11 +115,8 @@
         builder.addBlockingEdge(hba, hpa);
     }
 
-    public static class HashBuildTaskState extends AbstractStateObject {
-        private InMemoryHashJoin joiner;
-
-        public HashBuildTaskState() {
-        }
+    static class HashBuildTaskState extends AbstractStateObject {
+        InMemoryHashJoin joiner;
 
         private HashBuildTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
@@ -160,21 +147,23 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
+            final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+            final IMissingWriter[] nullWriters1;
             if (isLeftOuter) {
+                nullWriters1 = new IMissingWriter[nonMatchWriterFactories.length];
                 for (int i = 0; i < nonMatchWriterFactories.length; i++) {
                     nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter();
                 }
+            } else {
+                nullWriters1 = null;
             }
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
-            final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
-            final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+            final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
+            final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
             final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -186,12 +175,11 @@
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories0).createPartitioner(ctx);
                     ITuplePartitionComputer hpc1 =
                             new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories1).createPartitioner(ctx);
-                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
-                    state.joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), rd1, hpc1, comparator, isLeftOuter, nullWriters1, table,
-                            predEvaluator, bufferManager);
+                    state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+                    ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
+                    state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
+                            new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
+                            bufferManager);
                 }
 
                 @Override
@@ -250,6 +238,7 @@
                     writer.open();
                     state = (HashBuildTaskState) ctx
                             .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
+                    state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index d0f5a73..361d1ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -46,7 +46,7 @@
     private final FrameTupleAccessor accessorInner;
     private final FrameTupleAccessor accessorOuter;
     private final FrameTupleAppender appender;
-    private final ITuplePairComparator tpComparator;
+    private ITuplePairComparator tpComparator;
     private final IFrame outBuffer;
     private final IFrame innerBuffer;
     private final VariableFrameMemoryManager outerBufferMngr;
@@ -55,24 +55,23 @@
     private final ArrayTupleBuilder missingTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
-    private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+    private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
 
-    public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
-            ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+    public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+            FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriter[] missingWriters) throws HyracksDataException {
         this.accessorInner = accessorInner;
         this.accessorOuter = accessorOuter;
         this.appender = new FrameTupleAppender();
-        this.tpComparator = comparatorsOuter2Inner;
-        this.outBuffer = new VSizeFrame(ctx);
-        this.innerBuffer = new VSizeFrame(ctx);
+        this.outBuffer = new VSizeFrame(jobletContext);
+        this.innerBuffer = new VSizeFrame(jobletContext);
         this.appender.reset(outBuffer, true);
         if (memSize < 3) {
             throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
         }
-        this.outerBufferMngr =
-                new VariableFrameMemoryManager(new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)),
-                        FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+        this.outerBufferMngr = new VariableFrameMemoryManager(
+                new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)),
+                FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
 
         this.predEvaluator = predEval;
         this.isReversed = false;
@@ -91,8 +90,8 @@
         }
 
         FileReference file =
-                ctx.getJobletContext().createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
-        runFileWriter = new RunFileWriter(file, ctx.getIoManager());
+                jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, jobletContext.getIoManager());
         runFileWriter.open();
     }
 
@@ -100,6 +99,15 @@
         runFileWriter.nextFrame(buffer);
     }
 
+    /**
+     * Must be called before starting to join to set the right comparator with the right context.
+     *
+     * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+     */
+    void setComparator(ITuplePairComparator comparator) {
+        tpComparator = comparator;
+    }
+
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
             RunFileReader runFileReader = runFileWriter.createReader();
@@ -131,7 +139,7 @@
         for (int i = 0; i < tupleCount0; ++i) {
             boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
-                int c = compare(accessorOuter, i, accessorInner, j);
+                int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
                 boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
                     matchFound = true;
@@ -195,15 +203,6 @@
         outerBufferMngr.reset();
     }
 
-    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
-            throws HyracksDataException {
-        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
-        if (c != 0) {
-            return c;
-        }
-        return 0;
-    }
-
     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 2236056..1de8094 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -31,7 +32,6 @@
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,9 +114,9 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
+            final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
 
@@ -132,17 +132,15 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-
-                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
-                            comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
+                    state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+                    state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
+                            new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
 
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity());
                     FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.cache(copyBuffer);
                 }
@@ -180,6 +178,7 @@
                     writer.open();
                     state = (JoinCacheTaskState) ctx.getStateObject(
                             new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
+                    state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 7b6dcdb..9c28c61 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -62,11 +62,10 @@
         PROBE
     }
 
-    private final IHyracksTaskContext ctx;
+    private final IHyracksJobletContext jobletCtx;
 
     private final String buildRelName;
     private final String probeRelName;
-    private final ITuplePairComparator comparator;
     private final ITuplePartitionComputer buildHpc;
     private final ITuplePartitionComputer probeHpc;
     private final RecordDescriptor buildRd;
@@ -95,17 +94,16 @@
     private final TuplePointer tempPtr = new TuplePointer();
     private int[] probePSizeInTups;
 
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
-            String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
-            RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
-        this.ctx = ctx;
+    public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
+            String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
+            ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+            boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+        this.jobletCtx = jobletCtx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
         this.buildHpc = buildHpc;
         this.probeHpc = probeHpc;
-        this.comparator = comparator;
         this.buildRelName = buildRelName;
         this.probeRelName = probeRelName;
         this.numOfPartitions = numOfPartitions;
@@ -127,7 +125,7 @@
 
     public void initBuild() throws HyracksDataException {
         IDeallocatableFramePool framePool =
-                new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+                new DeallocatableFramePool(jobletCtx, memSizeInFrames * jobletCtx.getInitialFrameSize());
         bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
         bufferManager = new VPartitionTupleBufferManager(
                 PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
@@ -177,8 +175,8 @@
             int pid) throws HyracksDataException {
         RunFileWriter writer = runFileWriters[pid];
         if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIoManager());
+            FileReference file = jobletCtx.createManagedWorkspaceFile(refName);
+            writer = new RunFileWriter(file, jobletCtx.getIoManager());
             writer.open();
             runFileWriters[pid] = writer;
         }
@@ -194,10 +192,10 @@
         // and tries to bring back as many spilled partitions as possible if there is free space.
         int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
 
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
-        this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
-                predEvaluator, isReversed, bufferManagerForHashTable);
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
+        this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
+                isReversed, bufferManagerForHashTable);
 
         buildHashTable();
     }
@@ -250,7 +248,7 @@
      * @throws HyracksDataException
      */
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
-        int frameSize = ctx.getInitialFrameSize();
+        int frameSize = jobletCtx.getInitialFrameSize();
         long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
 
         int inMemTupCount = 0;
@@ -356,7 +354,7 @@
      * @return partition id of selected partition to reload
      */
     private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
-        int frameSize = ctx.getInitialFrameSize();
+        int frameSize = jobletCtx.getInitialFrameSize();
         // Add one frame to freeSpace to consider the one frame reserved for the spilled partition
         long totalFreeSpace = freeSpace + frameSize;
         if (totalFreeSpace > 0) {
@@ -379,7 +377,7 @@
         try {
             r.open();
             if (reloadBuffer == null) {
-                reloadBuffer = new VSizeFrame(ctx);
+                reloadBuffer = new VSizeFrame(jobletCtx);
             }
             while (r.nextFrame(reloadBuffer)) {
                 accessorBuild.reset(reloadBuffer.getBuffer());
@@ -430,8 +428,9 @@
         }
     }
 
-    public void initProbe() {
+    public void initProbe(ITuplePairComparator comparator) {
         probePSizeInTups = new int[numOfPartitions];
+        inMemJoiner.setComparator(comparator);
     }
 
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -464,7 +463,7 @@
                     VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
             // If the partition is at least half-full and insertion fails, that partition is preferred to get
             // spilled, otherwise the biggest partition gets chosen as the victim.
-            boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+            boolean modestCase = recordSize <= (jobletCtx.getInitialFrameSize() / 2);
             int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
                     : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
             // This method is called for the spilled partitions, so we know that this tuple is going to get written to
@@ -492,7 +491,7 @@
     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
             throws HyracksDataException {
         if (bigProbeFrameAppender == null) {
-            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
         }
         RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
         if (!bigProbeFrameAppender.append(accessorProbe, i)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 45cccec..1819b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -26,6 +26,7 @@
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -190,7 +191,7 @@
     }
 
     //memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
-    private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+    private static int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
             throws HyracksDataException {
         int numberOfPartitions = 0;
         if (memorySize <= 2) {
@@ -260,8 +261,6 @@
 
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-            final ITuplePairComparator probComparator =
-                    tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
@@ -284,9 +283,9 @@
                     state.memForJoin = memSizeInFrames - 2;
                     state.numOfPartitions =
                             getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
-                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                            PROBE_REL, BUILD_REL, probComparator, probeRd, buildRd, probeHpc, buildHpc, predEvaluator,
-                            isLeftOuter, nonMatchWriterFactories);
+                    state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
+                            state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
+                            predEvaluator, isLeftOuter, nonMatchWriterFactories);
 
                     state.hybridHJ.initBuild();
                     if (LOGGER.isTraceEnabled()) {
@@ -373,8 +372,9 @@
             }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
                 private BuildAndPartitionTaskState state;
-                private IFrame rPartbuff = new VSizeFrame(ctx);
+                private IFrame rPartbuff = new VSizeFrame(jobletCtx);
 
                 private FrameTupleAppender nullResultAppender = null;
                 private FrameTupleAccessor probeTupleAccessor;
@@ -386,7 +386,7 @@
                             new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
                     writer.open();
-                    state.hybridHJ.initProbe();
+                    state.hybridHJ.initProbe(probComp);
 
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -480,7 +480,7 @@
                             new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
                                     .createPartitioner(level);
 
-                    int frameSize = ctx.getInitialFrameSize();
+                    int frameSize = jobletCtx.getInitialFrameSize();
                     long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
                     long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
                     int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
@@ -575,7 +575,7 @@
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
-                    rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, comp, probeRd,
+                    rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
                             buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
@@ -598,7 +598,7 @@
                         probeSideReader.open();
                         rPartbuff.reset();
                         try {
-                            rHHj.initProbe();
+                            rHHj.initProbe(comp);
                             while (probeSideReader.nextFrame(rPartbuff)) {
                                 rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
@@ -696,7 +696,7 @@
 
                 private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
                     if (nullResultAppender == null) {
-                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
                     }
                     if (probeTupleAccessor == null) {
                         probeTupleAccessor = new FrameTupleAccessor(probeRd);
@@ -725,14 +725,14 @@
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     IDeallocatableFramePool framePool =
-                            new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
+                            new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
 
-                    ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
-                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe,
-                            new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, comp, isLeftOuter,
+                    ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
+                    InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
+                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
                             nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
-
+                    joiner.setComparator(comp);
                     try {
                         bReader.open();
                         rPartbuff.reset();
@@ -788,12 +788,12 @@
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
-                    NestedLoopJoin nlj =
-                            new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
-                                    nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
+                    NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
                     nlj.setIsReversed(isReversed);
+                    nlj.setComparator(nljComptorOuterInner);
 
-                    IFrame cacheBuff = new VSizeFrame(ctx);
+                    IFrame cacheBuff = new VSizeFrame(jobletCtx);
                     try {
                         innerReader.open();
                         while (innerReader.nextFrame(cacheBuff)) {
@@ -808,7 +808,7 @@
                         }
                     }
                     try {
-                        IFrame joinBuff = new VSizeFrame(ctx);
+                        IFrame joinBuff = new VSizeFrame(jobletCtx);
                         outerReader.open();
                         try {
                             while (outerReader.nextFrame(joinBuff)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 4c6b70f..93739df 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -24,7 +24,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -50,7 +50,7 @@
 public class OptimizedHybridHashJoinTest {
     int frameSize = 32768;
     int totalNumberOfFrames = 10;
-    IHyracksTaskContext ctx = TestUtils.create(frameSize);
+    IHyracksJobletContext ctx = TestUtils.create(frameSize).getJobletContext();
     OptimizedHybridHashJoin hhj;
     static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
     static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
@@ -150,8 +150,8 @@
 
     private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
 
-        hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
-                probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+        hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
+                buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
 
         hhj.initBuild();
 
@@ -184,7 +184,7 @@
                 //to the in memory joiner. As such, only next frame is important.
             }
         };
-        hhj.initProbe();
+        hhj.initProbe(comparator);
         for (int i = 0; i < totalNumberOfFrames; i++) {
             hhj.probe(frame.getBuffer(), writer);
             checkOneFrameReservedPerSpilledPartitions();
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 2809343..c485b32 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -34,31 +34,33 @@
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 
 public class TestJobletContext implements IHyracksJobletContext {
-    private final int frameSize;
+
     private final INCServiceContext serviceContext;
     private final FrameManager frameManger;
-    private JobId jobId;
-    private WorkspaceFileFactory fileFactory;
+    private final JobId jobId;
+    private final WorkspaceFileFactory fileFactory;
     private final long jobStartTime;
 
-    public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
-        this.frameSize = frameSize;
+    TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
         this.serviceContext = serviceContext;
         this.jobId = jobId;
-        fileFactory = new WorkspaceFileFactory(this, getIOManager());
+        fileFactory = new WorkspaceFileFactory(this, getIoManager());
         this.frameManger = new FrameManager(frameSize);
         this.jobStartTime = System.currentTimeMillis();
     }
 
-    ByteBuffer allocateFrame() throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame() throws HyracksDataException {
         return frameManger.allocateFrame();
     }
 
+    @Override
     public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
         return frameManger.allocateFrame(bytes);
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
+    @Override
+    public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
             throws HyracksDataException {
         return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData);
     }
@@ -67,15 +69,18 @@
         return null;
     }
 
-    void deallocateFrames(int bytes) {
+    @Override
+    public void deallocateFrames(int bytes) {
         frameManger.deallocateFrames(bytes);
     }
 
-    public int getFrameSize() {
-        return frameSize;
+    @Override
+    public final int getInitialFrameSize() {
+        return frameManger.getInitialFrameSize();
     }
 
-    public IIOManager getIOManager() {
+    @Override
+    public IIOManager getIoManager() {
         return serviceContext.getIoManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 7c602f5..dcb85f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -85,12 +85,12 @@
 
     @Override
     public int getInitialFrameSize() {
-        return jobletContext.getFrameSize();
+        return jobletContext.getInitialFrameSize();
     }
 
     @Override
     public IIOManager getIoManager() {
-        return jobletContext.getIOManager();
+        return jobletContext.getIoManager();
     }
 
     @Override