Merge branch 'gerrit/mad-hatter'

Change-Id: If1c03edce783ccd249d90383da938132ae654886
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 22719f6..7ee499e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.common;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hyracks.util.file.FileUtil.canonicalize;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -259,7 +260,7 @@
 
     public void runScriptAndCompareWithResult(File scriptFile, File expectedFile, File actualFile,
             ComparisonEnum compare, Charset actualEncoding, String statement) throws Exception {
-        LOGGER.info("Expected results file: {} ", expectedFile);
+        LOGGER.info("Expected results file: {} ", canonicalize(expectedFile));
         boolean regex = false;
         if (expectedFile.getName().endsWith(".ignore")) {
             return; //skip the comparison
@@ -331,7 +332,11 @@
                 throw createLineChangedException(scriptFile, "<EOF>", lineActual, num);
             }
         } catch (Exception e) {
-            LOGGER.info("Actual results file: {} encoding: {}", actualFile, actualEncoding);
+            if (!actualEncoding.equals(UTF_8)) {
+                LOGGER.info("Actual results file: {} encoding: {}", canonicalize(actualFile), actualEncoding);
+            } else {
+                LOGGER.info("Actual results file: {}", canonicalize(actualFile));
+            }
             throw e;
         }
 
@@ -339,8 +344,8 @@
 
     private ComparisonException createLineChangedException(File scriptFile, String lineExpected, String lineActual,
             int num) {
-        return new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\nexpected < "
-                + truncateIfLong(lineExpected) + "\nactual   > " + truncateIfLong(lineActual));
+        return new ComparisonException("Result for " + canonicalize(scriptFile) + " changed at line " + num
+                + ":\nexpected < " + truncateIfLong(lineExpected) + "\nactual   > " + truncateIfLong(lineActual));
     }
 
     private String truncateIfLong(String string) {
@@ -482,7 +487,7 @@
             if (match && !negate || negate && !match) {
                 continue;
             }
-            throw new Exception("Result for " + scriptFile + ": expected pattern '" + expression
+            throw new Exception("Result for " + canonicalize(scriptFile) + ": expected pattern '" + expression
                     + "' not found in result: " + actual);
         }
     }
@@ -511,7 +516,8 @@
                 }
                 endOfMatch = matcher.end();
             }
-            throw new Exception("Result for " + scriptFile + ": actual file did not match expected result");
+            throw new Exception(
+                    "Result for " + canonicalize(scriptFile) + ": actual file did not match expected result");
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 4826a99..0ab41b7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.test.sqlpp;
 
+import static org.apache.hyracks.util.file.FileUtil.canonicalize;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -85,7 +86,7 @@
                 try {
                     if (queryCount >= expectedResultFileCtxs.size()
                             && !cUnit.getOutputDir().getValue().equals("none")) {
-                        throw new ComparisonException("no result file for " + testFile.toString() + "; queryCount: "
+                        throw new ComparisonException("no result file for " + canonicalize(testFile) + "; queryCount: "
                                 + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
                     }
 
@@ -99,21 +100,21 @@
                             "[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
                     queryCount++;
                 } catch (Exception e) {
-                    System.err.println("testFile " + testFile.toString() + " raised an exception: " + e);
+                    System.err.println("testFile " + canonicalize(testFile) + " raised an exception: " + e);
                     if (cUnit.getExpectedError().isEmpty()) {
                         e.printStackTrace();
                         System.err.println("...Unexpected!");
                         if (failedGroup != null) {
                             failedGroup.getTestCase().add(testCaseCtx.getTestCase());
                         }
-                        throw new Exception("Test \"" + testFile + "\" FAILED!", e);
+                        throw new Exception("Test \"" + canonicalize(testFile) + "\" FAILED!", e);
                     } else {
                         // must compare with the expected failure message
                         if (e instanceof ComparisonException) {
                             throw e;
                         }
-                        LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
-                                + " failed as expected: " + e.getMessage());
+                        LOGGER.info("[TEST]: " + canonicalize(testCaseCtx.getTestCase().getFilePath()) + "/"
+                                + cUnit.getName() + " failed as expected: " + e.getMessage());
                         System.err.println("...but that was expected.");
                     }
                 }
@@ -168,7 +169,7 @@
             runScriptAndCompareWithResult(queryFile, expectedFile, actualResultFile, ComparisonEnum.TEXT,
                     StandardCharsets.UTF_8, null);
         } catch (Exception e) {
-            GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + queryFile);
+            GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + canonicalize(queryFile));
             throw e;
         } finally {
             writer.close();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index b2335c5..7a6fb94 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3373,6 +3373,7 @@
           <expected-warn>Incomparable input types: string and bigint (in line 25, at column 13)</expected-warn>
           <expected-warn>Incomparable input types: array and bigint (in line 23, at column 7)</expected-warn>
           <expected-warn>Incomparable input types: point and point (in line 24, at column 18)</expected-warn>
+          <expected-warn>Incomparable input types: bigint and string (in line 24, at column 46)</expected-warn>
       </compilation-unit>
     </test-case>
   </test-group>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index b70e0c7..d74f500 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 import org.apache.hyracks.api.io.IWritable;
@@ -32,7 +33,9 @@
 
     private String address;
     // Cached locally, not serialized
-    private byte[] ipAddress;
+    private volatile byte[] ipAddress;
+    // Cached locally, not serialized
+    private volatile InetSocketAddress inetSocketAddress;
 
     private int port;
 
@@ -52,6 +55,14 @@
         ipAddress = null;
     }
 
+    public NetworkAddress(InetSocketAddress socketAddress) {
+        this.address = socketAddress.getHostString();
+        this.port = socketAddress.getPort();
+        if (!socketAddress.isUnresolved()) {
+            ipAddress = socketAddress.getAddress().getAddress();
+        }
+    }
+
     public String getAddress() {
         return address;
     }
@@ -64,6 +75,13 @@
         return ipAddress;
     }
 
+    public InetSocketAddress resolveInetSocketAddress() {
+        if (inetSocketAddress == null) {
+            inetSocketAddress = new InetSocketAddress(address, port);
+        }
+        return inetSocketAddress;
+    }
+
     public int getPort() {
         return port;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 7a59926..41fed25 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
-public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry, IHyracksCommonContext {
     INCServiceContext getServiceContext();
 
     JobId getJobId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 9007628..f54a62c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -147,9 +147,11 @@
     @Override
     public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
         Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
-        nodeRegistry.forEach(
-                (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
-                        ncState.getResultPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
+        nodeRegistry
+                .forEach((key, ncState) -> result.put(key,
+                        new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataAddress(),
+                                ncState.getResultAddress(), ncState.getMessagingAddress(),
+                                ncState.getCapacity().getCores())));
         return result;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index bdc73d5..ad55398 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -415,7 +415,7 @@
                     for (int j = 0; j < inPartitionCounts[i]; ++j) {
                         TaskId producerTaskId = new TaskId(producerAid, j);
                         String nodeId = findTaskLocation(producerTaskId);
-                        partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataPort();
+                        partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataAddress();
                     }
                 }
                 tad.setInputPartitionLocations(partitionLocations);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
index 65851ef..41ff83d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
@@ -38,7 +38,7 @@
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState producerNCS = nodeManager.getNodeControllerState(desc.getNodeId());
         NodeControllerState requestorNCS = nodeManager.getNodeControllerState(req.getNodeId());
-        final NetworkAddress dataport = producerNCS.getDataPort();
+        final NetworkAddress dataport = producerNCS.getDataAddress();
         final INodeController requestorNC = requestorNCS.getNodeController();
         requestorNC.reportPartitionAvailability(pid, dataport);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 0ea9239..b1700fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -51,7 +51,7 @@
         String id = reg.getNodeId();
         LOGGER.info("registering node: {}", id);
         NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
-                ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
+                ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress()));
         INodeManager nodeManager = ccs.getNodeManager();
         NodeParameters params = new NodeParameters();
         params.setClusterControllerInfo(ccs.getClusterControllerInfo());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index d92727c..5c85134 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -172,9 +172,9 @@
         NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
         NetworkAddress msgAddr = new NetworkAddress(ipAddr, 1003);
         when(ncState.getCapacity()).thenReturn(new NodeCapacity(NODE_MEMORY_SIZE, NODE_CORES));
-        when(ncState.getDataPort()).thenReturn(dataAddr);
-        when(ncState.getResultPort()).thenReturn(resultAddr);
-        when(ncState.getMessagingPort()).thenReturn(msgAddr);
+        when(ncState.getDataAddress()).thenReturn(dataAddr);
+        when(ncState.getResultAddress()).thenReturn(resultAddr);
+        when(ncState.getMessagingAddress()).thenReturn(msgAddr);
         when(ncState.getConfig())
                 .thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr));
         Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
index 24a3e57..90a4432 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -47,11 +47,11 @@
 
     private final Map<SerializedOption, Object> config;
 
-    private final NetworkAddress dataPort;
+    private final NetworkAddress dataAddress;
 
-    private final NetworkAddress resultPort;
+    private final NetworkAddress resultAddress;
 
-    private final NetworkAddress messagingPort;
+    private final NetworkAddress messagingAddress;
 
     private final Set<JobId> activeJobIds;
 
@@ -151,9 +151,9 @@
         nodeId = reg.getNodeId();
         config = Collections.unmodifiableMap(reg.getConfig());
 
-        dataPort = reg.getDataPort();
-        resultPort = reg.getResultPort();
-        messagingPort = reg.getMessagingPort();
+        dataAddress = reg.getDataAddress();
+        resultAddress = reg.getResultAddress();
+        messagingAddress = reg.getMessagingAddress();
         activeJobIds = new HashSet<>();
 
         osName = reg.getOSName();
@@ -265,16 +265,16 @@
         return activeJobIds;
     }
 
-    public NetworkAddress getDataPort() {
-        return dataPort;
+    public NetworkAddress getDataAddress() {
+        return dataAddress;
     }
 
-    public NetworkAddress getResultPort() {
-        return resultPort;
+    public NetworkAddress getResultAddress() {
+        return resultAddress;
     }
 
-    public NetworkAddress getMessagingPort() {
-        return messagingPort;
+    public NetworkAddress getMessagingAddress() {
+        return messagingAddress;
     }
 
     public NodeCapacity getCapacity() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 474bc0a..d85f53c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,7 +22,6 @@
 import static org.apache.hyracks.util.MXHelper.runtimeMXBean;
 
 import java.io.Serializable;
-import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,15 +36,17 @@
 import org.apache.hyracks.util.PidHelper;
 
 public final class NodeRegistration implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final InetSocketAddress ncAddress;
+    private static final long serialVersionUID = 2L;
 
     private final String nodeId;
 
-    private final NetworkAddress dataPort;
+    private final NetworkAddress ncAddress;
 
-    private final NetworkAddress resultPort;
+    private final NetworkAddress dataAddress;
+
+    private final NetworkAddress resultAddress;
+
+    private final NetworkAddress messagingAddress;
 
     private final String osName;
 
@@ -73,22 +74,21 @@
 
     private final HeartbeatSchema hbSchema;
 
-    private final NetworkAddress messagingPort;
-
     private final long pid;
 
     private final NodeCapacity capacity;
 
     private final HashMap<SerializedOption, Object> config;
 
-    public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
-            NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
+    public NodeRegistration(NetworkAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataAddress,
+            NetworkAddress resultAddress, HeartbeatSchema hbSchema, NetworkAddress messagingAddress,
+            NodeCapacity capacity) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
-        this.dataPort = dataPort;
-        this.resultPort = resultPort;
+        this.dataAddress = dataAddress;
+        this.resultAddress = resultAddress;
         this.hbSchema = hbSchema;
-        this.messagingPort = messagingPort;
+        this.messagingAddress = messagingAddress;
         this.capacity = capacity;
         this.osName = osMXBean.getName();
         this.arch = osMXBean.getArch();
@@ -110,7 +110,7 @@
         }
     }
 
-    public InetSocketAddress getNodeControllerAddress() {
+    public NetworkAddress getNodeControllerAddress() {
         return ncAddress;
     }
 
@@ -126,12 +126,12 @@
         return config;
     }
 
-    public NetworkAddress getDataPort() {
-        return dataPort;
+    public NetworkAddress getDataAddress() {
+        return dataAddress;
     }
 
-    public NetworkAddress getResultPort() {
-        return resultPort;
+    public NetworkAddress getResultAddress() {
+        return resultAddress;
     }
 
     public String getOSName() {
@@ -186,8 +186,8 @@
         return systemProperties;
     }
 
-    public NetworkAddress getMessagingPort() {
-        return messagingPort;
+    public NetworkAddress getMessagingAddress() {
+        return messagingAddress;
     }
 
     public long getPid() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 35cf57f..02cd184 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -240,11 +240,13 @@
         nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
     }
 
-    ByteBuffer allocateFrame() throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame() throws HyracksDataException {
         return frameManager.allocateFrame();
     }
 
-    ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
         if (serviceCtx.getMemoryManager().allocate(bytes)) {
             memoryAllocation.addAndGet(bytes);
             return frameManager.allocateFrame(bytes);
@@ -252,18 +254,21 @@
         throw new HyracksDataException("Unable to allocate frame: Not enough memory");
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
+    @Override
+    public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
             throws HyracksDataException {
         return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData);
     }
 
-    void deallocateFrames(int bytes) {
+    @Override
+    public void deallocateFrames(int bytes) {
         memoryAllocation.addAndGet(bytes);
         serviceCtx.getMemoryManager().deallocate(bytes);
         frameManager.deallocateFrames(bytes);
     }
 
-    public final int getFrameSize() {
+    @Override
+    public final int getInitialFrameSize() {
         return frameManager.getInitialFrameSize();
     }
 
@@ -271,7 +276,8 @@
         return maxWarnings;
     }
 
-    public IIOManager getIOManager() {
+    @Override
+    public IIOManager getIoManager() {
         return serviceCtx.getIoManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bf07c20..ac2ef83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -319,11 +319,11 @@
 
     private void initNodeControllerState() {
         // Use "public" versions of network addresses and ports, if defined
-        InetSocketAddress ncAddress;
+        NetworkAddress ncAddress;
         if (ncConfig.getClusterPublicPort() == 0) {
-            ncAddress = ipc.getSocketAddress();
+            ncAddress = new NetworkAddress(ipc.getSocketAddress());
         } else {
-            ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+            ncAddress = new NetworkAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
         }
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -431,7 +431,7 @@
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
         heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
-                nodeRegistration.getNodeControllerAddress()));
+                nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress()));
         if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e58e4a4..158e24e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -139,7 +139,7 @@
         this.taskAttemptId = taskId;
         this.displayName = displayName;
         this.executorService = executor;
-        fileFactory = new WorkspaceFileFactory(this, joblet.getIOManager());
+        fileFactory = new WorkspaceFileFactory(this, joblet.getIoManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<>();
         opEnv = joblet.getEnvironment();
@@ -181,12 +181,12 @@
 
     @Override
     public int getInitialFrameSize() {
-        return joblet.getFrameSize();
+        return joblet.getInitialFrameSize();
     }
 
     @Override
     public IIOManager getIoManager() {
-        return joblet.getIOManager();
+        return joblet.getIoManager();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index eadbcf7..fb2d4e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -49,35 +49,35 @@
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
-    private IFrameTupleAccessor accessorProbe;
+    private final IFrameTupleAccessor accessorProbe;
     private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
-    private final ITuplePairComparator tpComparator;
+    private ITuplePairComparator tpComparator;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuild;
     private final ISerializableTable table;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
-    private TupleInFrameListAccessor tupleAccessor;
+    private final TupleInFrameListAccessor tupleAccessor;
     // To release frames
-    ISimpleFrameBufferManager bufferManager;
+    private final ISimpleFrameBufferManager bufferManager;
     private final boolean isTableCapacityNotZero;
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
-            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
-            ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+    public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
             throws HyracksDataException {
-        this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
-                missingWritersBuild, table, predEval, false, bufferManager);
+        this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
+                predEval, false, bufferManager);
     }
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
-            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
-            ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+    public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
             ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
             ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
         this.table = table;
@@ -88,7 +88,6 @@
         this.accessorProbe = accessorProbe;
         this.tpcProbe = tpcProbe;
         appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        tpComparator = comparator;
         predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
@@ -105,11 +104,7 @@
         reverseOutputOrder = reverse;
         this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
         this.bufferManager = bufferManager;
-        if (table.getTableSize() != 0) {
-            isTableCapacityNotZero = true;
-        } else {
-            isTableCapacityNotZero = false;
-        }
+        this.isTableCapacityNotZero = table.getTableSize() != 0;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("InMemoryHashJoin has been created for a table size of " + table.getTableSize()
                     + " for Thread ID " + Thread.currentThread().getId() + ".");
@@ -126,6 +121,7 @@
             storedTuplePointer.reset(bIndex, i);
             // If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
             if (!table.insert(entry, storedTuplePointer)) {
+                // TODO(ali): should check if insertion failed even after compaction and take action
                 compactTableAndInsertAgain(entry, storedTuplePointer);
             }
         }
@@ -152,6 +148,15 @@
     }
 
     /**
+     * Must be called before starting to join to set the right comparator with the right context.
+     *
+     * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+     */
+    void setComparator(ITuplePairComparator comparator) {
+        tpComparator = comparator;
+    }
+
+    /**
      * Reads the given tuple from the probe side and joins it with tuples from the build side.
      * This method assumes that the accessorProbe is already set to the current probe frame.
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ccca62d..33976a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -34,7 +35,6 @@
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -74,18 +74,8 @@
             IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
             IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
-        super(spec, 2, 1);
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories0 = hashFunctionFactories0;
-        this.hashFunctionFactories1 = hashFunctionFactories1;
-        this.comparatorFactory = comparatorFactory;
-        this.predEvaluatorFactory = predEvalFactory;
-        outRecDescs[0] = recordDescriptor;
-        this.isLeftOuter = false;
-        this.nonMatchWriterFactories = null;
-        this.tableSize = tableSize;
-        this.memSizeInFrames = memSizeInFrames;
+        this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
+                recordDescriptor, false, null, tableSize, memSizeInFrames);
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
@@ -100,7 +90,7 @@
         this.hashFunctionFactories1 = hashFunctionFactories1;
         this.comparatorFactory = comparatorFactory;
         this.predEvaluatorFactory = predEvalFactory;
-        outRecDescs[0] = recordDescriptor;
+        this.outRecDescs[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = missingWriterFactories1;
         this.tableSize = tableSize;
@@ -125,11 +115,8 @@
         builder.addBlockingEdge(hba, hpa);
     }
 
-    public static class HashBuildTaskState extends AbstractStateObject {
-        private InMemoryHashJoin joiner;
-
-        public HashBuildTaskState() {
-        }
+    static class HashBuildTaskState extends AbstractStateObject {
+        InMemoryHashJoin joiner;
 
         private HashBuildTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
@@ -160,21 +147,23 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
+            final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+            final IMissingWriter[] nullWriters1;
             if (isLeftOuter) {
+                nullWriters1 = new IMissingWriter[nonMatchWriterFactories.length];
                 for (int i = 0; i < nonMatchWriterFactories.length; i++) {
                     nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter();
                 }
+            } else {
+                nullWriters1 = null;
             }
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
-            final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
-            final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+            final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
+            final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
             final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -186,12 +175,11 @@
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories0).createPartitioner(ctx);
                     ITuplePartitionComputer hpc1 =
                             new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories1).createPartitioner(ctx);
-                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
-                    state.joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), rd1, hpc1, comparator, isLeftOuter, nullWriters1, table,
-                            predEvaluator, bufferManager);
+                    state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+                    ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
+                    state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
+                            new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
+                            bufferManager);
                 }
 
                 @Override
@@ -250,6 +238,7 @@
                     writer.open();
                     state = (HashBuildTaskState) ctx
                             .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
+                    state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index d0f5a73..361d1ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -46,7 +46,7 @@
     private final FrameTupleAccessor accessorInner;
     private final FrameTupleAccessor accessorOuter;
     private final FrameTupleAppender appender;
-    private final ITuplePairComparator tpComparator;
+    private ITuplePairComparator tpComparator;
     private final IFrame outBuffer;
     private final IFrame innerBuffer;
     private final VariableFrameMemoryManager outerBufferMngr;
@@ -55,24 +55,23 @@
     private final ArrayTupleBuilder missingTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
-    private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+    private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
 
-    public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
-            ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+    public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+            FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriter[] missingWriters) throws HyracksDataException {
         this.accessorInner = accessorInner;
         this.accessorOuter = accessorOuter;
         this.appender = new FrameTupleAppender();
-        this.tpComparator = comparatorsOuter2Inner;
-        this.outBuffer = new VSizeFrame(ctx);
-        this.innerBuffer = new VSizeFrame(ctx);
+        this.outBuffer = new VSizeFrame(jobletContext);
+        this.innerBuffer = new VSizeFrame(jobletContext);
         this.appender.reset(outBuffer, true);
         if (memSize < 3) {
             throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
         }
-        this.outerBufferMngr =
-                new VariableFrameMemoryManager(new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)),
-                        FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+        this.outerBufferMngr = new VariableFrameMemoryManager(
+                new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)),
+                FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
 
         this.predEvaluator = predEval;
         this.isReversed = false;
@@ -91,8 +90,8 @@
         }
 
         FileReference file =
-                ctx.getJobletContext().createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
-        runFileWriter = new RunFileWriter(file, ctx.getIoManager());
+                jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, jobletContext.getIoManager());
         runFileWriter.open();
     }
 
@@ -100,6 +99,15 @@
         runFileWriter.nextFrame(buffer);
     }
 
+    /**
+     * Must be called before starting to join to set the right comparator with the right context.
+     *
+     * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+     */
+    void setComparator(ITuplePairComparator comparator) {
+        tpComparator = comparator;
+    }
+
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
             RunFileReader runFileReader = runFileWriter.createReader();
@@ -131,7 +139,7 @@
         for (int i = 0; i < tupleCount0; ++i) {
             boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
-                int c = compare(accessorOuter, i, accessorInner, j);
+                int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
                 boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
                     matchFound = true;
@@ -195,15 +203,6 @@
         outerBufferMngr.reset();
     }
 
-    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
-            throws HyracksDataException {
-        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
-        if (c != 0) {
-            return c;
-        }
-        return 0;
-    }
-
     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 2236056..1de8094 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -31,7 +32,6 @@
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,9 +114,9 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
+            final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
 
@@ -132,17 +132,15 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-
-                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
-                            comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
+                    state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+                    state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
+                            new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
 
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity());
                     FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.cache(copyBuffer);
                 }
@@ -180,6 +178,7 @@
                     writer.open();
                     state = (JoinCacheTaskState) ctx.getStateObject(
                             new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
+                    state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 6fb55ec..dba21d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -62,11 +62,10 @@
         PROBE
     }
 
-    private final IHyracksTaskContext ctx;
+    private final IHyracksJobletContext jobletCtx;
 
     private final String buildRelName;
     private final String probeRelName;
-    private final ITuplePairComparator comparator;
     private final ITuplePartitionComputer buildHpc;
     private final ITuplePartitionComputer probeHpc;
     private final RecordDescriptor buildRd;
@@ -95,17 +94,16 @@
     private final TuplePointer tempPtr = new TuplePointer();
     private int[] probePSizeInTups;
 
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
-            String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
-            RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
-        this.ctx = ctx;
+    public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
+            String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
+            ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+            boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+        this.jobletCtx = jobletCtx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
         this.buildHpc = buildHpc;
         this.probeHpc = probeHpc;
-        this.comparator = comparator;
         this.buildRelName = buildRelName;
         this.probeRelName = probeRelName;
         this.numOfPartitions = numOfPartitions;
@@ -127,7 +125,7 @@
 
     public void initBuild() throws HyracksDataException {
         IDeallocatableFramePool framePool =
-                new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+                new DeallocatableFramePool(jobletCtx, memSizeInFrames * jobletCtx.getInitialFrameSize());
         bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
         bufferManager = new VPartitionTupleBufferManager(
                 PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
@@ -177,8 +175,8 @@
             int pid) throws HyracksDataException {
         RunFileWriter writer = runFileWriters[pid];
         if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIoManager());
+            FileReference file = jobletCtx.createManagedWorkspaceFile(refName);
+            writer = new RunFileWriter(file, jobletCtx.getIoManager());
             writer.open();
             runFileWriters[pid] = writer;
         }
@@ -194,10 +192,10 @@
         // and tries to bring back as many spilled partitions as possible if there is free space.
         int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
 
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
-        this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
-                predEvaluator, isReversed, bufferManagerForHashTable);
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
+        this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
+                isReversed, bufferManagerForHashTable);
 
         buildHashTable();
     }
@@ -250,7 +248,7 @@
      * @throws HyracksDataException
      */
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
-        int frameSize = ctx.getInitialFrameSize();
+        int frameSize = jobletCtx.getInitialFrameSize();
         long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
 
         int inMemTupCount = 0;
@@ -356,7 +354,7 @@
      * @return partition id of selected partition to reload
      */
     private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
-        int frameSize = ctx.getInitialFrameSize();
+        int frameSize = jobletCtx.getInitialFrameSize();
         // Add one frame to freeSpace to consider the one frame reserved for the spilled partition
         long totalFreeSpace = freeSpace + frameSize;
         if (totalFreeSpace > 0) {
@@ -379,7 +377,7 @@
         try {
             r.open();
             if (reloadBuffer == null) {
-                reloadBuffer = new VSizeFrame(ctx);
+                reloadBuffer = new VSizeFrame(jobletCtx);
             }
             while (r.nextFrame(reloadBuffer)) {
                 accessorBuild.reset(reloadBuffer.getBuffer());
@@ -430,8 +428,9 @@
         }
     }
 
-    public void initProbe() {
+    public void initProbe(ITuplePairComparator comparator) {
         probePSizeInTups = new int[numOfPartitions];
+        inMemJoiner.setComparator(comparator);
         bufferManager.setConstrain(VPartitionTupleBufferManager.NO_CONSTRAIN);
     }
 
@@ -465,7 +464,7 @@
                     VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
             // If the partition is at least half-full and insertion fails, that partition is preferred to get
             // spilled, otherwise the biggest partition gets chosen as the victim.
-            boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+            boolean modestCase = recordSize <= (jobletCtx.getInitialFrameSize() / 2);
             int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
                     : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
             // This method is called for the spilled partitions, so we know that this tuple is going to get written to
@@ -493,7 +492,7 @@
     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
             throws HyracksDataException {
         if (bigProbeFrameAppender == null) {
-            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
         }
         RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
         if (!bigProbeFrameAppender.append(accessorProbe, i)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 45cccec..1819b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -26,6 +26,7 @@
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -190,7 +191,7 @@
     }
 
     //memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
-    private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+    private static int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
             throws HyracksDataException {
         int numberOfPartitions = 0;
         if (memorySize <= 2) {
@@ -260,8 +261,6 @@
 
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-            final ITuplePairComparator probComparator =
-                    tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
             final IPredicateEvaluator predEvaluator =
                     (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
@@ -284,9 +283,9 @@
                     state.memForJoin = memSizeInFrames - 2;
                     state.numOfPartitions =
                             getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
-                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                            PROBE_REL, BUILD_REL, probComparator, probeRd, buildRd, probeHpc, buildHpc, predEvaluator,
-                            isLeftOuter, nonMatchWriterFactories);
+                    state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
+                            state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
+                            predEvaluator, isLeftOuter, nonMatchWriterFactories);
 
                     state.hybridHJ.initBuild();
                     if (LOGGER.isTraceEnabled()) {
@@ -373,8 +372,9 @@
             }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
                 private BuildAndPartitionTaskState state;
-                private IFrame rPartbuff = new VSizeFrame(ctx);
+                private IFrame rPartbuff = new VSizeFrame(jobletCtx);
 
                 private FrameTupleAppender nullResultAppender = null;
                 private FrameTupleAccessor probeTupleAccessor;
@@ -386,7 +386,7 @@
                             new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
                     writer.open();
-                    state.hybridHJ.initProbe();
+                    state.hybridHJ.initProbe(probComp);
 
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -480,7 +480,7 @@
                             new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
                                     .createPartitioner(level);
 
-                    int frameSize = ctx.getInitialFrameSize();
+                    int frameSize = jobletCtx.getInitialFrameSize();
                     long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
                     long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
                     int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
@@ -575,7 +575,7 @@
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
-                    rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, comp, probeRd,
+                    rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
                             buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
@@ -598,7 +598,7 @@
                         probeSideReader.open();
                         rPartbuff.reset();
                         try {
-                            rHHj.initProbe();
+                            rHHj.initProbe(comp);
                             while (probeSideReader.nextFrame(rPartbuff)) {
                                 rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
@@ -696,7 +696,7 @@
 
                 private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
                     if (nullResultAppender == null) {
-                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
                     }
                     if (probeTupleAccessor == null) {
                         probeTupleAccessor = new FrameTupleAccessor(probeRd);
@@ -725,14 +725,14 @@
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     IDeallocatableFramePool framePool =
-                            new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
+                            new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
 
-                    ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
-                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe,
-                            new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, comp, isLeftOuter,
+                    ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
+                    InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
+                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
                             nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
-
+                    joiner.setComparator(comp);
                     try {
                         bReader.open();
                         rPartbuff.reset();
@@ -788,12 +788,12 @@
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
-                    NestedLoopJoin nlj =
-                            new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
-                                    nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
+                    NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
                     nlj.setIsReversed(isReversed);
+                    nlj.setComparator(nljComptorOuterInner);
 
-                    IFrame cacheBuff = new VSizeFrame(ctx);
+                    IFrame cacheBuff = new VSizeFrame(jobletCtx);
                     try {
                         innerReader.open();
                         while (innerReader.nextFrame(cacheBuff)) {
@@ -808,7 +808,7 @@
                         }
                     }
                     try {
-                        IFrame joinBuff = new VSizeFrame(ctx);
+                        IFrame joinBuff = new VSizeFrame(jobletCtx);
                         outerReader.open();
                         try {
                             while (outerReader.nextFrame(joinBuff)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 4c6b70f..93739df 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -24,7 +24,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -50,7 +50,7 @@
 public class OptimizedHybridHashJoinTest {
     int frameSize = 32768;
     int totalNumberOfFrames = 10;
-    IHyracksTaskContext ctx = TestUtils.create(frameSize);
+    IHyracksJobletContext ctx = TestUtils.create(frameSize).getJobletContext();
     OptimizedHybridHashJoin hhj;
     static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
     static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
@@ -150,8 +150,8 @@
 
     private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
 
-        hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
-                probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+        hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
+                buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
 
         hhj.initBuild();
 
@@ -184,7 +184,7 @@
                 //to the in memory joiner. As such, only next frame is important.
             }
         };
-        hhj.initProbe();
+        hhj.initProbe(comparator);
         for (int i = 0; i < totalNumberOfFrames; i++) {
             hhj.probe(frame.getBuffer(), writer);
             checkOneFrameReservedPerSpilledPartitions();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 4ca7f1a..0151dbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -151,7 +151,7 @@
 
     @Override
     public void addPendingCredits(int credit) {
-        cSet.addPendingCredits(channelId, credit);
+        cSet.addPendingCredits(this, credit);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index 179f42c..20dc8f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -143,13 +143,15 @@
         }
     }
 
-    void addPendingCredits(int channelId, int delta) {
+    void addPendingCredits(ChannelControlBlock targetCcb, int delta) {
         if (delta <= 0) {
             return;
         }
         synchronized (mConn) {
+            final int channelId = targetCcb.getChannelId();
             ChannelControlBlock ccb = ccbArray[channelId];
-            if (ccb != null) {
+            // ensure the channel slot id was not recycled and used for a diffierent channel
+            if (ccb == targetCcb) {
                 if (ccb.getRemoteEOS()) {
                     return;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 2809343..c485b32 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -34,31 +34,33 @@
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 
 public class TestJobletContext implements IHyracksJobletContext {
-    private final int frameSize;
+
     private final INCServiceContext serviceContext;
     private final FrameManager frameManger;
-    private JobId jobId;
-    private WorkspaceFileFactory fileFactory;
+    private final JobId jobId;
+    private final WorkspaceFileFactory fileFactory;
     private final long jobStartTime;
 
-    public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
-        this.frameSize = frameSize;
+    TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
         this.serviceContext = serviceContext;
         this.jobId = jobId;
-        fileFactory = new WorkspaceFileFactory(this, getIOManager());
+        fileFactory = new WorkspaceFileFactory(this, getIoManager());
         this.frameManger = new FrameManager(frameSize);
         this.jobStartTime = System.currentTimeMillis();
     }
 
-    ByteBuffer allocateFrame() throws HyracksDataException {
+    @Override
+    public ByteBuffer allocateFrame() throws HyracksDataException {
         return frameManger.allocateFrame();
     }
 
+    @Override
     public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
         return frameManger.allocateFrame(bytes);
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
+    @Override
+    public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
             throws HyracksDataException {
         return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData);
     }
@@ -67,15 +69,18 @@
         return null;
     }
 
-    void deallocateFrames(int bytes) {
+    @Override
+    public void deallocateFrames(int bytes) {
         frameManger.deallocateFrames(bytes);
     }
 
-    public int getFrameSize() {
-        return frameSize;
+    @Override
+    public final int getInitialFrameSize() {
+        return frameManger.getInitialFrameSize();
     }
 
-    public IIOManager getIOManager() {
+    @Override
+    public IIOManager getIoManager() {
         return serviceContext.getIoManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 7c602f5..dcb85f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -85,12 +85,12 @@
 
     @Override
     public int getInitialFrameSize() {
-        return jobletContext.getFrameSize();
+        return jobletContext.getInitialFrameSize();
     }
 
     @Override
     public IIOManager getIoManager() {
-        return jobletContext.getIOManager();
+        return jobletContext.getIoManager();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
index fe60653..7d162ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Path;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
@@ -31,6 +32,7 @@
 public class FileUtil {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final Pattern PARENT_DIR = Pattern.compile("([/\\\\]|^)[^./\\\\]+[/\\\\]\\.\\.([/\\\\]|$)");
     private static final Object LOCK = new Object();
     private static final int MAX_COPY_ATTEMPTS = 3;
 
@@ -95,4 +97,22 @@
             raf.getChannel().force(true);
         }
     }
+
+    public static String canonicalize(CharSequence path) {
+        String newPath = path.toString();
+        Matcher matcher = PARENT_DIR.matcher(newPath);
+        while (matcher.find()) {
+            // TODO(mblow): use StringBuilder once Java 8 is no longer supported (requires >=9)
+            StringBuffer sb = new StringBuffer();
+            matcher.appendReplacement(sb, matcher.group(2).isEmpty() ? "" : matcher.group(1).replace("\\", "\\\\"));
+            matcher.appendTail(sb);
+            newPath = sb.toString();
+            matcher.reset(newPath);
+        }
+        return newPath;
+    }
+
+    public static File canonicalize(File file) {
+        return new File(canonicalize(file.getPath()));
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
index 8d6c631..87c730c 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
@@ -41,4 +41,27 @@
                 joinPath('\\', "\\\\myserver\\tmp\\\\far\\baz\\\\\\\\lala"));
         Assert.assertEquals("C:\\temp\\far\\baz\\lala", joinPath('\\', "C:\\temp\\\\far\\baz\\\\\\\\lala\\"));
     }
+
+    @Test
+    public void testCanonicalize() {
+        Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo/../bat.txt"));
+        Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo/bar/../../bat.txt"));
+        Assert.assertEquals("foo/", FileUtil.canonicalize("foo/bar/../"));
+        Assert.assertEquals("foo", FileUtil.canonicalize("foo/bar/.."));
+        Assert.assertEquals("../bat.txt", FileUtil.canonicalize("../bat.txt"));
+        Assert.assertEquals("/bat.txt", FileUtil.canonicalize("/foo/bar/../../bat.txt"));
+        Assert.assertEquals("/bar/bat.txt", FileUtil.canonicalize("/foo/../bar/bat.txt"));
+    }
+
+    @Test
+    public void testCanonicalizeWindoze() {
+        Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo\\..\\bat.txt"));
+        Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo\\bar\\..\\..\\bat.txt"));
+        Assert.assertEquals("foo\\", FileUtil.canonicalize("foo\\bar\\..\\"));
+        Assert.assertEquals("foo", FileUtil.canonicalize("foo\\bar\\.."));
+        Assert.assertEquals("..\\bat.txt", FileUtil.canonicalize("..\\bat.txt"));
+        Assert.assertEquals("\\bat.txt", FileUtil.canonicalize("\\foo\\bar\\..\\..\\bat.txt"));
+        Assert.assertEquals("\\bar\\bat.txt", FileUtil.canonicalize("\\foo\\..\\bar\\bat.txt"));
+        Assert.assertEquals("C:\\bar\\bat.txt", FileUtil.canonicalize("C:\\foo\\..\\bar\\bat.txt"));
+    }
 }