Merge branch 'gerrit/mad-hatter'
Change-Id: If1c03edce783ccd249d90383da938132ae654886
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 22719f6..7ee499e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -19,6 +19,7 @@
package org.apache.asterix.test.common;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hyracks.util.file.FileUtil.canonicalize;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@@ -259,7 +260,7 @@
public void runScriptAndCompareWithResult(File scriptFile, File expectedFile, File actualFile,
ComparisonEnum compare, Charset actualEncoding, String statement) throws Exception {
- LOGGER.info("Expected results file: {} ", expectedFile);
+ LOGGER.info("Expected results file: {} ", canonicalize(expectedFile));
boolean regex = false;
if (expectedFile.getName().endsWith(".ignore")) {
return; //skip the comparison
@@ -331,7 +332,11 @@
throw createLineChangedException(scriptFile, "<EOF>", lineActual, num);
}
} catch (Exception e) {
- LOGGER.info("Actual results file: {} encoding: {}", actualFile, actualEncoding);
+ if (!actualEncoding.equals(UTF_8)) {
+ LOGGER.info("Actual results file: {} encoding: {}", canonicalize(actualFile), actualEncoding);
+ } else {
+ LOGGER.info("Actual results file: {}", canonicalize(actualFile));
+ }
throw e;
}
@@ -339,8 +344,8 @@
private ComparisonException createLineChangedException(File scriptFile, String lineExpected, String lineActual,
int num) {
- return new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\nexpected < "
- + truncateIfLong(lineExpected) + "\nactual > " + truncateIfLong(lineActual));
+ return new ComparisonException("Result for " + canonicalize(scriptFile) + " changed at line " + num
+ + ":\nexpected < " + truncateIfLong(lineExpected) + "\nactual > " + truncateIfLong(lineActual));
}
private String truncateIfLong(String string) {
@@ -482,7 +487,7 @@
if (match && !negate || negate && !match) {
continue;
}
- throw new Exception("Result for " + scriptFile + ": expected pattern '" + expression
+ throw new Exception("Result for " + canonicalize(scriptFile) + ": expected pattern '" + expression
+ "' not found in result: " + actual);
}
}
@@ -511,7 +516,8 @@
}
endOfMatch = matcher.end();
}
- throw new Exception("Result for " + scriptFile + ": actual file did not match expected result");
+ throw new Exception(
+ "Result for " + canonicalize(scriptFile) + ": actual file did not match expected result");
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 4826a99..0ab41b7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.test.sqlpp;
+import static org.apache.hyracks.util.file.FileUtil.canonicalize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
@@ -85,7 +86,7 @@
try {
if (queryCount >= expectedResultFileCtxs.size()
&& !cUnit.getOutputDir().getValue().equals("none")) {
- throw new ComparisonException("no result file for " + testFile.toString() + "; queryCount: "
+ throw new ComparisonException("no result file for " + canonicalize(testFile) + "; queryCount: "
+ queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
}
@@ -99,21 +100,21 @@
"[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
queryCount++;
} catch (Exception e) {
- System.err.println("testFile " + testFile.toString() + " raised an exception: " + e);
+ System.err.println("testFile " + canonicalize(testFile) + " raised an exception: " + e);
if (cUnit.getExpectedError().isEmpty()) {
e.printStackTrace();
System.err.println("...Unexpected!");
if (failedGroup != null) {
failedGroup.getTestCase().add(testCaseCtx.getTestCase());
}
- throw new Exception("Test \"" + testFile + "\" FAILED!", e);
+ throw new Exception("Test \"" + canonicalize(testFile) + "\" FAILED!", e);
} else {
// must compare with the expected failure message
if (e instanceof ComparisonException) {
throw e;
}
- LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
- + " failed as expected: " + e.getMessage());
+ LOGGER.info("[TEST]: " + canonicalize(testCaseCtx.getTestCase().getFilePath()) + "/"
+ + cUnit.getName() + " failed as expected: " + e.getMessage());
System.err.println("...but that was expected.");
}
}
@@ -168,7 +169,7 @@
runScriptAndCompareWithResult(queryFile, expectedFile, actualResultFile, ComparisonEnum.TEXT,
StandardCharsets.UTF_8, null);
} catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + queryFile);
+ GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + canonicalize(queryFile));
throw e;
} finally {
writer.close();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index b2335c5..7a6fb94 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3373,6 +3373,7 @@
<expected-warn>Incomparable input types: string and bigint (in line 25, at column 13)</expected-warn>
<expected-warn>Incomparable input types: array and bigint (in line 23, at column 7)</expected-warn>
<expected-warn>Incomparable input types: point and point (in line 24, at column 18)</expected-warn>
+ <expected-warn>Incomparable input types: bigint and string (in line 24, at column 46)</expected-warn>
</compilation-unit>
</test-case>
</test-group>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index b70e0c7..d74f500 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.apache.hyracks.api.io.IWritable;
@@ -32,7 +33,9 @@
private String address;
// Cached locally, not serialized
- private byte[] ipAddress;
+ private volatile byte[] ipAddress;
+ // Cached locally, not serialized
+ private volatile InetSocketAddress inetSocketAddress;
private int port;
@@ -52,6 +55,14 @@
ipAddress = null;
}
+ public NetworkAddress(InetSocketAddress socketAddress) {
+ this.address = socketAddress.getHostString();
+ this.port = socketAddress.getPort();
+ if (!socketAddress.isUnresolved()) {
+ ipAddress = socketAddress.getAddress().getAddress();
+ }
+ }
+
public String getAddress() {
return address;
}
@@ -64,6 +75,13 @@
return ipAddress;
}
+ public InetSocketAddress resolveInetSocketAddress() {
+ if (inetSocketAddress == null) {
+ inetSocketAddress = new InetSocketAddress(address, port);
+ }
+ return inetSocketAddress;
+ }
+
public int getPort() {
return port;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 7a59926..41fed25 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
-public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry, IHyracksCommonContext {
INCServiceContext getServiceContext();
JobId getJobId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 9007628..f54a62c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -147,9 +147,11 @@
@Override
public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
- nodeRegistry.forEach(
- (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
- ncState.getResultPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
+ nodeRegistry
+ .forEach((key, ncState) -> result.put(key,
+ new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataAddress(),
+ ncState.getResultAddress(), ncState.getMessagingAddress(),
+ ncState.getCapacity().getCores())));
return result;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index bdc73d5..ad55398 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -415,7 +415,7 @@
for (int j = 0; j < inPartitionCounts[i]; ++j) {
TaskId producerTaskId = new TaskId(producerAid, j);
String nodeId = findTaskLocation(producerTaskId);
- partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataPort();
+ partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataAddress();
}
}
tad.setInputPartitionLocations(partitionLocations);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
index 65851ef..41ff83d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
@@ -38,7 +38,7 @@
INodeManager nodeManager = ccs.getNodeManager();
NodeControllerState producerNCS = nodeManager.getNodeControllerState(desc.getNodeId());
NodeControllerState requestorNCS = nodeManager.getNodeControllerState(req.getNodeId());
- final NetworkAddress dataport = producerNCS.getDataPort();
+ final NetworkAddress dataport = producerNCS.getDataAddress();
final INodeController requestorNC = requestorNCS.getNodeController();
requestorNC.reportPartitionAvailability(pid, dataport);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 0ea9239..b1700fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -51,7 +51,7 @@
String id = reg.getNodeId();
LOGGER.info("registering node: {}", id);
NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
- ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
+ ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress()));
INodeManager nodeManager = ccs.getNodeManager();
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index d92727c..5c85134 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -172,9 +172,9 @@
NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
NetworkAddress msgAddr = new NetworkAddress(ipAddr, 1003);
when(ncState.getCapacity()).thenReturn(new NodeCapacity(NODE_MEMORY_SIZE, NODE_CORES));
- when(ncState.getDataPort()).thenReturn(dataAddr);
- when(ncState.getResultPort()).thenReturn(resultAddr);
- when(ncState.getMessagingPort()).thenReturn(msgAddr);
+ when(ncState.getDataAddress()).thenReturn(dataAddr);
+ when(ncState.getResultAddress()).thenReturn(resultAddr);
+ when(ncState.getMessagingAddress()).thenReturn(msgAddr);
when(ncState.getConfig())
.thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr));
Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
index 24a3e57..90a4432 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -47,11 +47,11 @@
private final Map<SerializedOption, Object> config;
- private final NetworkAddress dataPort;
+ private final NetworkAddress dataAddress;
- private final NetworkAddress resultPort;
+ private final NetworkAddress resultAddress;
- private final NetworkAddress messagingPort;
+ private final NetworkAddress messagingAddress;
private final Set<JobId> activeJobIds;
@@ -151,9 +151,9 @@
nodeId = reg.getNodeId();
config = Collections.unmodifiableMap(reg.getConfig());
- dataPort = reg.getDataPort();
- resultPort = reg.getResultPort();
- messagingPort = reg.getMessagingPort();
+ dataAddress = reg.getDataAddress();
+ resultAddress = reg.getResultAddress();
+ messagingAddress = reg.getMessagingAddress();
activeJobIds = new HashSet<>();
osName = reg.getOSName();
@@ -265,16 +265,16 @@
return activeJobIds;
}
- public NetworkAddress getDataPort() {
- return dataPort;
+ public NetworkAddress getDataAddress() {
+ return dataAddress;
}
- public NetworkAddress getResultPort() {
- return resultPort;
+ public NetworkAddress getResultAddress() {
+ return resultAddress;
}
- public NetworkAddress getMessagingPort() {
- return messagingPort;
+ public NetworkAddress getMessagingAddress() {
+ return messagingAddress;
}
public NodeCapacity getCapacity() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 474bc0a..d85f53c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,7 +22,6 @@
import static org.apache.hyracks.util.MXHelper.runtimeMXBean;
import java.io.Serializable;
-import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,15 +36,17 @@
import org.apache.hyracks.util.PidHelper;
public final class NodeRegistration implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final InetSocketAddress ncAddress;
+ private static final long serialVersionUID = 2L;
private final String nodeId;
- private final NetworkAddress dataPort;
+ private final NetworkAddress ncAddress;
- private final NetworkAddress resultPort;
+ private final NetworkAddress dataAddress;
+
+ private final NetworkAddress resultAddress;
+
+ private final NetworkAddress messagingAddress;
private final String osName;
@@ -73,22 +74,21 @@
private final HeartbeatSchema hbSchema;
- private final NetworkAddress messagingPort;
-
private final long pid;
private final NodeCapacity capacity;
private final HashMap<SerializedOption, Object> config;
- public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
+ public NodeRegistration(NetworkAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataAddress,
+ NetworkAddress resultAddress, HeartbeatSchema hbSchema, NetworkAddress messagingAddress,
+ NodeCapacity capacity) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
- this.dataPort = dataPort;
- this.resultPort = resultPort;
+ this.dataAddress = dataAddress;
+ this.resultAddress = resultAddress;
this.hbSchema = hbSchema;
- this.messagingPort = messagingPort;
+ this.messagingAddress = messagingAddress;
this.capacity = capacity;
this.osName = osMXBean.getName();
this.arch = osMXBean.getArch();
@@ -110,7 +110,7 @@
}
}
- public InetSocketAddress getNodeControllerAddress() {
+ public NetworkAddress getNodeControllerAddress() {
return ncAddress;
}
@@ -126,12 +126,12 @@
return config;
}
- public NetworkAddress getDataPort() {
- return dataPort;
+ public NetworkAddress getDataAddress() {
+ return dataAddress;
}
- public NetworkAddress getResultPort() {
- return resultPort;
+ public NetworkAddress getResultAddress() {
+ return resultAddress;
}
public String getOSName() {
@@ -186,8 +186,8 @@
return systemProperties;
}
- public NetworkAddress getMessagingPort() {
- return messagingPort;
+ public NetworkAddress getMessagingAddress() {
+ return messagingAddress;
}
public long getPid() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 35cf57f..02cd184 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -240,11 +240,13 @@
nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
}
- ByteBuffer allocateFrame() throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame() throws HyracksDataException {
return frameManager.allocateFrame();
}
- ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
if (serviceCtx.getMemoryManager().allocate(bytes)) {
memoryAllocation.addAndGet(bytes);
return frameManager.allocateFrame(bytes);
@@ -252,18 +254,21 @@
throw new HyracksDataException("Unable to allocate frame: Not enough memory");
}
- ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
+ @Override
+ public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
throws HyracksDataException {
return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData);
}
- void deallocateFrames(int bytes) {
+ @Override
+ public void deallocateFrames(int bytes) {
memoryAllocation.addAndGet(bytes);
serviceCtx.getMemoryManager().deallocate(bytes);
frameManager.deallocateFrames(bytes);
}
- public final int getFrameSize() {
+ @Override
+ public final int getInitialFrameSize() {
return frameManager.getInitialFrameSize();
}
@@ -271,7 +276,8 @@
return maxWarnings;
}
- public IIOManager getIOManager() {
+ @Override
+ public IIOManager getIoManager() {
return serviceCtx.getIoManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bf07c20..ac2ef83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -319,11 +319,11 @@
private void initNodeControllerState() {
// Use "public" versions of network addresses and ports, if defined
- InetSocketAddress ncAddress;
+ NetworkAddress ncAddress;
if (ncConfig.getClusterPublicPort() == 0) {
- ncAddress = ipc.getSocketAddress();
+ ncAddress = new NetworkAddress(ipc.getSocketAddress());
} else {
- ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+ ncAddress = new NetworkAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
}
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
@@ -431,7 +431,7 @@
NodeParameters nodeParameters = ccc.getNodeParameters();
// Start heartbeat generator.
heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
- nodeRegistration.getNodeControllerAddress()));
+ nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress()));
if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
Timer ccTimer = new Timer("Timer-" + ccId, true);
// Schedule profile dump generator.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e58e4a4..158e24e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -139,7 +139,7 @@
this.taskAttemptId = taskId;
this.displayName = displayName;
this.executorService = executor;
- fileFactory = new WorkspaceFileFactory(this, joblet.getIOManager());
+ fileFactory = new WorkspaceFileFactory(this, joblet.getIoManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<>();
opEnv = joblet.getEnvironment();
@@ -181,12 +181,12 @@
@Override
public int getInitialFrameSize() {
- return joblet.getFrameSize();
+ return joblet.getInitialFrameSize();
}
@Override
public IIOManager getIoManager() {
- return joblet.getIOManager();
+ return joblet.getIoManager();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index eadbcf7..fb2d4e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -49,35 +49,35 @@
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
- private IFrameTupleAccessor accessorProbe;
+ private final IFrameTupleAccessor accessorProbe;
private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
- private final ITuplePairComparator tpComparator;
+ private ITuplePairComparator tpComparator;
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuild;
private final ISerializableTable table;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
private final IPredicateEvaluator predEvaluator;
- private TupleInFrameListAccessor tupleAccessor;
+ private final TupleInFrameListAccessor tupleAccessor;
// To release frames
- ISimpleFrameBufferManager bufferManager;
+ private final ISimpleFrameBufferManager bufferManager;
private final boolean isTableCapacityNotZero;
private static final Logger LOGGER = LogManager.getLogger();
- public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
- FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
- ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+ public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+ ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+ ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
throws HyracksDataException {
- this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
- missingWritersBuild, table, predEval, false, bufferManager);
+ this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
+ predEval, false, bufferManager);
}
- public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
- FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
- ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+ public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+ ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+ ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
this.table = table;
@@ -88,7 +88,6 @@
this.accessorProbe = accessorProbe;
this.tpcProbe = tpcProbe;
appender = new FrameTupleAppender(new VSizeFrame(ctx));
- tpComparator = comparator;
predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
@@ -105,11 +104,7 @@
reverseOutputOrder = reverse;
this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
this.bufferManager = bufferManager;
- if (table.getTableSize() != 0) {
- isTableCapacityNotZero = true;
- } else {
- isTableCapacityNotZero = false;
- }
+ this.isTableCapacityNotZero = table.getTableSize() != 0;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("InMemoryHashJoin has been created for a table size of " + table.getTableSize()
+ " for Thread ID " + Thread.currentThread().getId() + ".");
@@ -126,6 +121,7 @@
storedTuplePointer.reset(bIndex, i);
// If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
if (!table.insert(entry, storedTuplePointer)) {
+ // TODO(ali): should check if insertion failed even after compaction and take action
compactTableAndInsertAgain(entry, storedTuplePointer);
}
}
@@ -152,6 +148,15 @@
}
/**
+ * Must be called before starting to join to set the right comparator with the right context.
+ *
+ * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+ */
+ void setComparator(ITuplePairComparator comparator) {
+ tpComparator = comparator;
+ }
+
+ /**
* Reads the given tuple from the probe side and joins it with tuples from the build side.
* This method assumes that the accessorProbe is already set to the current probe frame.
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ccca62d..33976a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -34,7 +35,6 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -74,18 +74,8 @@
IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
- super(spec, 2, 1);
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories0 = hashFunctionFactories0;
- this.hashFunctionFactories1 = hashFunctionFactories1;
- this.comparatorFactory = comparatorFactory;
- this.predEvaluatorFactory = predEvalFactory;
- outRecDescs[0] = recordDescriptor;
- this.isLeftOuter = false;
- this.nonMatchWriterFactories = null;
- this.tableSize = tableSize;
- this.memSizeInFrames = memSizeInFrames;
+ this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
+ recordDescriptor, false, null, tableSize, memSizeInFrames);
}
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
@@ -100,7 +90,7 @@
this.hashFunctionFactories1 = hashFunctionFactories1;
this.comparatorFactory = comparatorFactory;
this.predEvaluatorFactory = predEvalFactory;
- outRecDescs[0] = recordDescriptor;
+ this.outRecDescs[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = missingWriterFactories1;
this.tableSize = tableSize;
@@ -125,11 +115,8 @@
builder.addBlockingEdge(hba, hpa);
}
- public static class HashBuildTaskState extends AbstractStateObject {
- private InMemoryHashJoin joiner;
-
- public HashBuildTaskState() {
- }
+ static class HashBuildTaskState extends AbstractStateObject {
+ InMemoryHashJoin joiner;
private HashBuildTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
@@ -160,21 +147,23 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
+ final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
- final IMissingWriter[] nullWriters1 =
- isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+ final IMissingWriter[] nullWriters1;
if (isLeftOuter) {
+ nullWriters1 = new IMissingWriter[nonMatchWriterFactories.length];
for (int i = 0; i < nonMatchWriterFactories.length; i++) {
nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter();
}
+ } else {
+ nullWriters1 = null;
}
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
- final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
- final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+ final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
+ final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -186,12 +175,11 @@
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories0).createPartitioner(ctx);
ITuplePartitionComputer hpc1 =
new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories1).createPartitioner(ctx);
- state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
- ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
- state.joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0,
- new FrameTupleAccessor(rd1), rd1, hpc1, comparator, isLeftOuter, nullWriters1, table,
- predEvaluator, bufferManager);
+ state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+ ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
+ state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
+ new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
+ bufferManager);
}
@Override
@@ -250,6 +238,7 @@
writer.open();
state = (HashBuildTaskState) ctx
.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
+ state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index d0f5a73..361d1ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -46,7 +46,7 @@
private final FrameTupleAccessor accessorInner;
private final FrameTupleAccessor accessorOuter;
private final FrameTupleAppender appender;
- private final ITuplePairComparator tpComparator;
+ private ITuplePairComparator tpComparator;
private final IFrame outBuffer;
private final IFrame innerBuffer;
private final VariableFrameMemoryManager outerBufferMngr;
@@ -55,24 +55,23 @@
private final ArrayTupleBuilder missingTupleBuilder;
private final IPredicateEvaluator predEvaluator;
private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
- private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+ private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
- public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
- ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+ public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+ FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
this.accessorInner = accessorInner;
this.accessorOuter = accessorOuter;
this.appender = new FrameTupleAppender();
- this.tpComparator = comparatorsOuter2Inner;
- this.outBuffer = new VSizeFrame(ctx);
- this.innerBuffer = new VSizeFrame(ctx);
+ this.outBuffer = new VSizeFrame(jobletContext);
+ this.innerBuffer = new VSizeFrame(jobletContext);
this.appender.reset(outBuffer, true);
if (memSize < 3) {
throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
}
- this.outerBufferMngr =
- new VariableFrameMemoryManager(new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)),
- FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+ this.outerBufferMngr = new VariableFrameMemoryManager(
+ new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)),
+ FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
this.predEvaluator = predEval;
this.isReversed = false;
@@ -91,8 +90,8 @@
}
FileReference file =
- ctx.getJobletContext().createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
- runFileWriter = new RunFileWriter(file, ctx.getIoManager());
+ jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, jobletContext.getIoManager());
runFileWriter.open();
}
@@ -100,6 +99,15 @@
runFileWriter.nextFrame(buffer);
}
+ /**
+ * Must be called before starting to join to set the right comparator with the right context.
+ *
+ * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+ */
+ void setComparator(ITuplePairComparator comparator) {
+ tpComparator = comparator;
+ }
+
public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
RunFileReader runFileReader = runFileWriter.createReader();
@@ -131,7 +139,7 @@
for (int i = 0; i < tupleCount0; ++i) {
boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
- int c = compare(accessorOuter, i, accessorInner, j);
+ int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
@@ -195,15 +203,6 @@
outerBufferMngr.reset();
}
- private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
- throws HyracksDataException {
- int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
-
public void setIsReversed(boolean b) {
this.isReversed = b;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 2236056..1de8094 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -31,7 +32,6 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,9 +114,9 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
+ final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
@@ -132,17 +132,15 @@
@Override
public void open() throws HyracksDataException {
- state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
-
- state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
- comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
+ state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+ state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
+ new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+ ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity());
FrameUtils.copyAndFlip(buffer, copyBuffer);
state.joiner.cache(copyBuffer);
}
@@ -180,6 +178,7 @@
writer.open();
state = (JoinCacheTaskState) ctx.getStateObject(
new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
+ state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 6fb55ec..dba21d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -62,11 +62,10 @@
PROBE
}
- private final IHyracksTaskContext ctx;
+ private final IHyracksJobletContext jobletCtx;
private final String buildRelName;
private final String probeRelName;
- private final ITuplePairComparator comparator;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
private final RecordDescriptor buildRd;
@@ -95,17 +94,16 @@
private final TuplePointer tempPtr = new TuplePointer();
private int[] probePSizeInTups;
- public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
- String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
- RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
- IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
- this.ctx = ctx;
+ public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
+ String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
+ ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+ boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+ this.jobletCtx = jobletCtx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
this.probeRd = probeRd;
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
- this.comparator = comparator;
this.buildRelName = buildRelName;
this.probeRelName = probeRelName;
this.numOfPartitions = numOfPartitions;
@@ -127,7 +125,7 @@
public void initBuild() throws HyracksDataException {
IDeallocatableFramePool framePool =
- new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+ new DeallocatableFramePool(jobletCtx, memSizeInFrames * jobletCtx.getInitialFrameSize());
bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
bufferManager = new VPartitionTupleBufferManager(
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
@@ -177,8 +175,8 @@
int pid) throws HyracksDataException {
RunFileWriter writer = runFileWriters[pid];
if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIoManager());
+ FileReference file = jobletCtx.createManagedWorkspaceFile(refName);
+ writer = new RunFileWriter(file, jobletCtx.getIoManager());
writer.open();
runFileWriters[pid] = writer;
}
@@ -194,10 +192,10 @@
// and tries to bring back as many spilled partitions as possible if there is free space.
int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
- ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
- this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
- predEvaluator, isReversed, bufferManagerForHashTable);
+ ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
+ this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
+ new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
+ isReversed, bufferManagerForHashTable);
buildHashTable();
}
@@ -250,7 +248,7 @@
* @throws HyracksDataException
*/
private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
int inMemTupCount = 0;
@@ -356,7 +354,7 @@
* @return partition id of selected partition to reload
*/
private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
// Add one frame to freeSpace to consider the one frame reserved for the spilled partition
long totalFreeSpace = freeSpace + frameSize;
if (totalFreeSpace > 0) {
@@ -379,7 +377,7 @@
try {
r.open();
if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
+ reloadBuffer = new VSizeFrame(jobletCtx);
}
while (r.nextFrame(reloadBuffer)) {
accessorBuild.reset(reloadBuffer.getBuffer());
@@ -430,8 +428,9 @@
}
}
- public void initProbe() {
+ public void initProbe(ITuplePairComparator comparator) {
probePSizeInTups = new int[numOfPartitions];
+ inMemJoiner.setComparator(comparator);
bufferManager.setConstrain(VPartitionTupleBufferManager.NO_CONSTRAIN);
}
@@ -465,7 +464,7 @@
VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
// If the partition is at least half-full and insertion fails, that partition is preferred to get
// spilled, otherwise the biggest partition gets chosen as the victim.
- boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+ boolean modestCase = recordSize <= (jobletCtx.getInitialFrameSize() / 2);
int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
: spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
// This method is called for the spilled partitions, so we know that this tuple is going to get written to
@@ -493,7 +492,7 @@
private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
throws HyracksDataException {
if (bigProbeFrameAppender == null) {
- bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
}
RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
if (!bigProbeFrameAppender.append(accessorProbe, i)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 45cccec..1819b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -190,7 +191,7 @@
}
//memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
- private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+ private static int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
throws HyracksDataException {
int numberOfPartitions = 0;
if (memorySize <= 2) {
@@ -260,8 +261,6 @@
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
- final ITuplePairComparator probComparator =
- tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
@@ -284,9 +283,9 @@
state.memForJoin = memSizeInFrames - 2;
state.numOfPartitions =
getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
- PROBE_REL, BUILD_REL, probComparator, probeRd, buildRd, probeHpc, buildHpc, predEvaluator,
- isLeftOuter, nonMatchWriterFactories);
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
+ state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
+ predEvaluator, isLeftOuter, nonMatchWriterFactories);
state.hybridHJ.initBuild();
if (LOGGER.isTraceEnabled()) {
@@ -373,8 +372,9 @@
}
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
private BuildAndPartitionTaskState state;
- private IFrame rPartbuff = new VSizeFrame(ctx);
+ private IFrame rPartbuff = new VSizeFrame(jobletCtx);
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
@@ -386,7 +386,7 @@
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- state.hybridHJ.initProbe();
+ state.hybridHJ.initProbe(probComp);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -480,7 +480,7 @@
new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
.createPartitioner(level);
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
@@ -575,7 +575,7 @@
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
- rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, comp, probeRd,
+ rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
@@ -598,7 +598,7 @@
probeSideReader.open();
rPartbuff.reset();
try {
- rHHj.initProbe();
+ rHHj.initProbe(comp);
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
@@ -696,7 +696,7 @@
private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
if (nullResultAppender == null) {
- nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
}
if (probeTupleAccessor == null) {
probeTupleAccessor = new FrameTupleAccessor(probeRd);
@@ -725,14 +725,14 @@
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
IDeallocatableFramePool framePool =
- new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
+ new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
- ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe,
- new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, comp, isLeftOuter,
+ ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
+ hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
-
+ joiner.setComparator(comp);
try {
bReader.open();
rPartbuff.reset();
@@ -788,12 +788,12 @@
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
- NestedLoopJoin nlj =
- new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
- nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
+ NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
+ new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
nlj.setIsReversed(isReversed);
+ nlj.setComparator(nljComptorOuterInner);
- IFrame cacheBuff = new VSizeFrame(ctx);
+ IFrame cacheBuff = new VSizeFrame(jobletCtx);
try {
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -808,7 +808,7 @@
}
}
try {
- IFrame joinBuff = new VSizeFrame(ctx);
+ IFrame joinBuff = new VSizeFrame(jobletCtx);
outerReader.open();
try {
while (outerReader.nextFrame(joinBuff)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 4c6b70f..93739df 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -50,7 +50,7 @@
public class OptimizedHybridHashJoinTest {
int frameSize = 32768;
int totalNumberOfFrames = 10;
- IHyracksTaskContext ctx = TestUtils.create(frameSize);
+ IHyracksJobletContext ctx = TestUtils.create(frameSize).getJobletContext();
OptimizedHybridHashJoin hhj;
static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
@@ -150,8 +150,8 @@
private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
- hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
- probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+ hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
+ buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
hhj.initBuild();
@@ -184,7 +184,7 @@
//to the in memory joiner. As such, only next frame is important.
}
};
- hhj.initProbe();
+ hhj.initProbe(comparator);
for (int i = 0; i < totalNumberOfFrames; i++) {
hhj.probe(frame.getBuffer(), writer);
checkOneFrameReservedPerSpilledPartitions();
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 4ca7f1a..0151dbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -151,7 +151,7 @@
@Override
public void addPendingCredits(int credit) {
- cSet.addPendingCredits(channelId, credit);
+ cSet.addPendingCredits(this, credit);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index 179f42c..20dc8f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -143,13 +143,15 @@
}
}
- void addPendingCredits(int channelId, int delta) {
+ void addPendingCredits(ChannelControlBlock targetCcb, int delta) {
if (delta <= 0) {
return;
}
synchronized (mConn) {
+ final int channelId = targetCcb.getChannelId();
ChannelControlBlock ccb = ccbArray[channelId];
- if (ccb != null) {
+ // ensure the channel slot id was not recycled and used for a diffierent channel
+ if (ccb == targetCcb) {
if (ccb.getRemoteEOS()) {
return;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 2809343..c485b32 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -34,31 +34,33 @@
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
public class TestJobletContext implements IHyracksJobletContext {
- private final int frameSize;
+
private final INCServiceContext serviceContext;
private final FrameManager frameManger;
- private JobId jobId;
- private WorkspaceFileFactory fileFactory;
+ private final JobId jobId;
+ private final WorkspaceFileFactory fileFactory;
private final long jobStartTime;
- public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
- this.frameSize = frameSize;
+ TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
this.serviceContext = serviceContext;
this.jobId = jobId;
- fileFactory = new WorkspaceFileFactory(this, getIOManager());
+ fileFactory = new WorkspaceFileFactory(this, getIoManager());
this.frameManger = new FrameManager(frameSize);
this.jobStartTime = System.currentTimeMillis();
}
- ByteBuffer allocateFrame() throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame() throws HyracksDataException {
return frameManger.allocateFrame();
}
+ @Override
public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
return frameManger.allocateFrame(bytes);
}
- ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
+ @Override
+ public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
throws HyracksDataException {
return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData);
}
@@ -67,15 +69,18 @@
return null;
}
- void deallocateFrames(int bytes) {
+ @Override
+ public void deallocateFrames(int bytes) {
frameManger.deallocateFrames(bytes);
}
- public int getFrameSize() {
- return frameSize;
+ @Override
+ public final int getInitialFrameSize() {
+ return frameManger.getInitialFrameSize();
}
- public IIOManager getIOManager() {
+ @Override
+ public IIOManager getIoManager() {
return serviceContext.getIoManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 7c602f5..dcb85f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -85,12 +85,12 @@
@Override
public int getInitialFrameSize() {
- return jobletContext.getFrameSize();
+ return jobletContext.getInitialFrameSize();
}
@Override
public IIOManager getIoManager() {
- return jobletContext.getIOManager();
+ return jobletContext.getIoManager();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
index fe60653..7d162ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Path;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
@@ -31,6 +32,7 @@
public class FileUtil {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final Pattern PARENT_DIR = Pattern.compile("([/\\\\]|^)[^./\\\\]+[/\\\\]\\.\\.([/\\\\]|$)");
private static final Object LOCK = new Object();
private static final int MAX_COPY_ATTEMPTS = 3;
@@ -95,4 +97,22 @@
raf.getChannel().force(true);
}
}
+
+ public static String canonicalize(CharSequence path) {
+ String newPath = path.toString();
+ Matcher matcher = PARENT_DIR.matcher(newPath);
+ while (matcher.find()) {
+ // TODO(mblow): use StringBuilder once Java 8 is no longer supported (requires >=9)
+ StringBuffer sb = new StringBuffer();
+ matcher.appendReplacement(sb, matcher.group(2).isEmpty() ? "" : matcher.group(1).replace("\\", "\\\\"));
+ matcher.appendTail(sb);
+ newPath = sb.toString();
+ matcher.reset(newPath);
+ }
+ return newPath;
+ }
+
+ public static File canonicalize(File file) {
+ return new File(canonicalize(file.getPath()));
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
index 8d6c631..87c730c 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/file/FileUtilTest.java
@@ -41,4 +41,27 @@
joinPath('\\', "\\\\myserver\\tmp\\\\far\\baz\\\\\\\\lala"));
Assert.assertEquals("C:\\temp\\far\\baz\\lala", joinPath('\\', "C:\\temp\\\\far\\baz\\\\\\\\lala\\"));
}
+
+ @Test
+ public void testCanonicalize() {
+ Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo/../bat.txt"));
+ Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo/bar/../../bat.txt"));
+ Assert.assertEquals("foo/", FileUtil.canonicalize("foo/bar/../"));
+ Assert.assertEquals("foo", FileUtil.canonicalize("foo/bar/.."));
+ Assert.assertEquals("../bat.txt", FileUtil.canonicalize("../bat.txt"));
+ Assert.assertEquals("/bat.txt", FileUtil.canonicalize("/foo/bar/../../bat.txt"));
+ Assert.assertEquals("/bar/bat.txt", FileUtil.canonicalize("/foo/../bar/bat.txt"));
+ }
+
+ @Test
+ public void testCanonicalizeWindoze() {
+ Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo\\..\\bat.txt"));
+ Assert.assertEquals("bat.txt", FileUtil.canonicalize("foo\\bar\\..\\..\\bat.txt"));
+ Assert.assertEquals("foo\\", FileUtil.canonicalize("foo\\bar\\..\\"));
+ Assert.assertEquals("foo", FileUtil.canonicalize("foo\\bar\\.."));
+ Assert.assertEquals("..\\bat.txt", FileUtil.canonicalize("..\\bat.txt"));
+ Assert.assertEquals("\\bat.txt", FileUtil.canonicalize("\\foo\\bar\\..\\..\\bat.txt"));
+ Assert.assertEquals("\\bar\\bat.txt", FileUtil.canonicalize("\\foo\\..\\bar\\bat.txt"));
+ Assert.assertEquals("C:\\bar\\bat.txt", FileUtil.canonicalize("C:\\foo\\..\\bar\\bat.txt"));
+ }
}