Merge commit '94b6da6ae33749dfdee35e35f32c165693015e3d' from release-0.9.4-pre-rc
Change-Id: I73edc93888a80687416cc80014ffebc2683c68f6
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 494198b..57d080e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -211,6 +211,7 @@
? getCurrentSystemState() : SystemState.HEALTHY;
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
currentStatus, systemState);
+ ncs.notifyRegistrationCompleted(ccId);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 8c0a3d4..80048bd 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -154,6 +154,7 @@
protected int endpointSelector;
protected IExternalUDFLibrarian librarian;
private Map<File, TestLoop> testLoops = new HashMap<>();
+ private double timeoutMultiplier = 1;
public TestExecutor() {
this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
@@ -183,6 +184,10 @@
this.replicationAddress = replicationAddress;
}
+ public void setTimeoutMultiplier(double timeoutMultiplier) {
+ this.timeoutMultiplier = timeoutMultiplier;
+ }
+
/**
* Probably does not work well with symlinks.
*/
@@ -1449,10 +1454,10 @@
return false;
}
- public static int getTimeoutSecs(String statement) {
+ public int getTimeoutSecs(String statement) {
final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
if (timeoutMatcher.find()) {
- return Integer.parseInt(timeoutMatcher.group(1));
+ return (int) (Integer.parseInt(timeoutMatcher.group(1)) * timeoutMultiplier);
} else {
throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
}
@@ -1796,7 +1801,8 @@
waitForClusterState("ACTIVE", timeoutSecs, timeUnit);
}
- public void waitForClusterState(String desiredState, int timeout, TimeUnit timeUnit) throws Exception {
+ public void waitForClusterState(String desiredState, int baseTimeout, TimeUnit timeUnit) throws Exception {
+ int timeout = (int) (baseTimeout * timeoutMultiplier);
LOGGER.info("Waiting for cluster state " + desiredState + "...");
Thread t = new Thread(() -> {
while (true) {
@@ -1874,7 +1880,7 @@
}
String host = command[0];
int port = Integer.parseInt(command[1]);
- int timeoutSec = Integer.parseInt(command[2]);
+ int timeoutSec = (int) (Integer.parseInt(command[2]) * timeoutMultiplier);
while (isPortActive(host, port)) {
TimeUnit.SECONDS.sleep(1);
timeoutSec--;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index e1a36cd..b44a6bb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -103,14 +103,13 @@
if (nodeRegistry.containsKey(nodeId)) {
LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
failNode(nodeId);
- } else {
- try {
- // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
- IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
- ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
- } catch (IPCException e) {
- throw HyracksDataException.create(e);
- }
+ }
+ try {
+ // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
+ IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
+ ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
+ } catch (IPCException e) {
+ throw HyracksDataException.create(e);
}
LOGGER.warn("adding node to registry");
nodeRegistry.put(nodeId, ncState);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 98f5c70..6d54843 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -363,8 +363,7 @@
};
ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(
ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
- CcConnection ccc = new CcConnection(ccProxy);
- return registerNode(ccc, ccAddress);
+ return registerNode(new CcConnection(ccProxy), ccAddress);
}
}
@@ -415,7 +414,6 @@
public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
LOGGER.info("Registering with Cluster Controller {}", ccc);
-
int registrationId = nextRegistrationId.incrementAndGet();
pendingRegistrations.put(registrationId, ccc);
CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
@@ -425,10 +423,8 @@
if (distributedState != null) {
getDistributedState().put(ccId, distributedState);
}
- application.onRegisterNode(ccId);
IClusterController ccs = ccc.getClusterControllerService();
NodeParameters nodeParameters = ccc.getNodeParameters();
-
// Start heartbeat generator.
if (!heartbeatThreads.containsKey(ccId)) {
Thread heartbeatThread = new Thread(
@@ -445,8 +441,6 @@
ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
ccTimers.put(ccId, ccTimer);
}
- ccc.notifyRegistrationCompleted();
- LOGGER.info("Registering with Cluster Controller {} complete", ccc);
return ccId;
}
@@ -708,7 +702,6 @@
}
private class TraceCurrentTimeTask extends TimerTask {
-
private ITracer tracer;
private long traceCategory;
@@ -726,4 +719,14 @@
}
}
}
+
+ public INCApplication getApplication() {
+ return application;
+ }
+
+ public void notifyRegistrationCompleted(CcId ccId) {
+ CcConnection ccc = getCcConnection(ccId);
+ ccc.notifyRegistrationCompleted();
+ LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9d99968..d7b930c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashSet;
@@ -112,6 +113,8 @@
private final IStatsCollector statsCollector;
+ private volatile boolean completed = false;
+
public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
ExecutorService executor, NodeControllerService ncs,
List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -255,8 +258,11 @@
if (aborted) {
return false;
}
- pendingThreads.add(t);
- return true;
+ return pendingThreads.add(t);
+ }
+
+ public synchronized List<Thread> getPendingThreads() {
+ return new ArrayList<>(pendingThreads);
}
private synchronized void removePendingThread(Thread t) {
@@ -300,8 +306,6 @@
executorService.execute(() -> {
try {
Thread thread = Thread.currentThread();
- // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
- // the thread is not escaped from interruption.
if (!addPendingThread(thread)) {
return;
}
@@ -345,6 +349,7 @@
} finally {
close();
removePendingThread(ct);
+ completed = true;
}
if (!exceptions.isEmpty()) {
if (LOGGER.isWarnEnabled()) {
@@ -460,4 +465,8 @@
public IStatsCollector getStatsCollector() {
return statsCollector;
}
+
+ public boolean isCompleted() {
+ return completed;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
index 3612d8f..a85ca2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
@@ -22,12 +22,12 @@
public interface IIOCounter {
/**
- * @return the number of block reads from the very beginning
+ * @return the number of block reads from the very beginning, or -1 if not available on this platform
*/
- public long getReads();
+ long getReads();
/**
- * @return the number of block writes from the very beginning
+ * @return the number of block writes from the very beginning, or -1 if not available on this platform
*/
- public long getWrites();
+ long getWrites();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java
new file mode 100644
index 0000000..842d82b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.util.Span;
+
+abstract class IOCounterCache<T> implements IIOCounter {
+ private static final long TTL_NANOS = TimeUnit.MILLISECONDS.toNanos(500);
+ private Span span;
+ private T info;
+
+ protected synchronized T getInfo() throws IOException {
+ if (info == null || span.elapsed()) {
+ span = Span.start(TTL_NANOS, TimeUnit.NANOSECONDS);
+ info = calculateInfo();
+ }
+ return info;
+ }
+
+ protected abstract T calculateInfo() throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
index 1f8669d..d5ec9a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
@@ -21,14 +21,16 @@
public class IOCounterDefault implements IIOCounter {
+ public static final long IO_COUNTER_UNAVAILABLE = -1;
+
@Override
public long getReads() {
- return 0;
+ return IO_COUNTER_UNAVAILABLE;
}
@Override
public long getWrites() {
- return 0;
+ return IO_COUNTER_UNAVAILABLE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
index 2301ae6..36b310d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -33,9 +33,12 @@
*/
public IIOCounter getIOCounter() {
String osName = System.getProperty("os.name").toLowerCase();
- if (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") >= 0) {
- return new IOCounterLinux();
- } else if (osName.indexOf("mac") >= 0) {
+ if (osName.contains("nix") || osName.contains("nux") || osName.contains("aix")) {
+ if (IOCounterProc.STATFILE.exists()) {
+ return new IOCounterProc();
+ }
+ return new IOCounterIoStat();
+ } else if (osName.contains("mac")) {
return new IOCounterOSX();
} else {
return new IOCounterDefault();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java
new file mode 100644
index 0000000..560035b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import static org.apache.hyracks.control.nc.io.profiling.IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IOCounterIoStat extends IOCounterCache<List<String>> {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final String COMMAND = "iostat";
+ private static final int PAGE_SIZE = 512;
+ private long failureCount;
+
+ private long baseReads;
+ private long baseWrites;
+
+ IOCounterIoStat() {
+ baseReads = getReads();
+ baseWrites = getWrites();
+ }
+
+ @Override
+ public long getReads() {
+ try {
+ long reads = extractColumn(4) * PAGE_SIZE;
+ return reads == 0 ? IO_COUNTER_UNAVAILABLE : reads - baseReads;
+ } catch (Exception e) {
+ LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting reads", e);
+ return IO_COUNTER_UNAVAILABLE;
+ }
+ }
+
+ @Override
+ public long getWrites() {
+ try {
+ long writes = extractColumn(5) * PAGE_SIZE;
+ return writes == 0 ? IO_COUNTER_UNAVAILABLE : writes - baseWrites;
+ } catch (Exception e) {
+ LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting writes", e);
+ return IO_COUNTER_UNAVAILABLE;
+ }
+ }
+
+ private long extractColumn(int columnIndex) throws IOException {
+ boolean device = false;
+ long ios = 0;
+ for (String line : getInfo()) {
+ if (line.contains("Blk_read")) {
+ device = true;
+ continue;
+ }
+ if (device) {
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ int i = 0;
+ while (tokenizer.hasMoreTokens()) {
+ String column = tokenizer.nextToken();
+ if (i == columnIndex) {
+ ios += Long.parseLong(column);
+ break;
+ }
+ i++;
+ }
+ }
+ }
+ return ios;
+ }
+
+ @Override
+ protected List<String> calculateInfo() throws IOException {
+ try (InputStream inputStream = Runtime.getRuntime().exec(COMMAND).getInputStream()) {
+ return IOUtils.readLines(inputStream, Charset.defaultCharset());
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java
deleted file mode 100644
index 3db4c8c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.control.nc.io.profiling;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-public class IOCounterLinux implements IIOCounter {
- public static final String COMMAND = "iostat";
- public static final String STATFILE = "/proc/self/io";
- public static final int PAGE_SIZE = 4096;
-
- private long baseReads = 0;
- private long baseWrites = 0;
-
- public IOCounterLinux() {
- baseReads = getReads();
- baseWrites = getWrites();
- }
-
- @Override
- public long getReads() {
- try {
- long reads = extractRow(4);
- return reads;
- } catch (IOException e) {
- try {
- long reads = extractColumn(4) * PAGE_SIZE;
- return reads - baseReads;
- } catch (IOException e2) {
- return 0;
- }
- }
- }
-
- @Override
- public long getWrites() {
- try {
- long writes = extractRow(5);
- long cancelledWrites = extractRow(6);
- return (writes - cancelledWrites);
- } catch (IOException e) {
- try {
- long writes = extractColumn(5) * PAGE_SIZE;
- return writes - baseWrites;
- } catch (IOException e2) {
- return 0;
- }
- }
- }
-
- private long extractColumn(int columnIndex) throws IOException {
- BufferedReader reader = exec(COMMAND);
- String line = null;
- boolean device = false;
- long ios = 0;
- while ((line = reader.readLine()) != null) {
- if (line.contains("Blk_read")) {
- device = true;
- continue;
- }
- if (device == true) {
- StringTokenizer tokenizer = new StringTokenizer(line);
- int i = 0;
- while (tokenizer.hasMoreTokens()) {
- String column = tokenizer.nextToken();
- if (i == columnIndex) {
- ios += Long.parseLong(column);
- break;
- }
- i++;
- }
- }
- }
- reader.close();
- return ios;
- }
-
- private long extractRow(int rowIndex) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(STATFILE)));
- String line = null;
- long ios = 0;
- int i = 0;
- while ((line = reader.readLine()) != null) {
- if (i == rowIndex) {
- StringTokenizer tokenizer = new StringTokenizer(line);
- int j = 0;
- while (tokenizer.hasMoreTokens()) {
- String column = tokenizer.nextToken();
- if (j == 1) {
- ios = Long.parseLong(column);
- break;
- }
- j++;
- }
- }
- i++;
- }
- reader.close();
- return ios;
- }
-
- private BufferedReader exec(String command) throws IOException {
- Process p = Runtime.getRuntime().exec(command);
- return new BufferedReader(new InputStreamReader(p.getInputStream()));
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
index 729157b..a48d55d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
@@ -19,16 +19,18 @@
package org.apache.hyracks.control.nc.io.profiling;
+import static org.apache.hyracks.control.nc.io.profiling.IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+
public class IOCounterOSX implements IIOCounter {
@Override
public long getReads() {
- return 0;
+ return IO_COUNTER_UNAVAILABLE;
}
@Override
public long getWrites() {
- return 0;
+ return IO_COUNTER_UNAVAILABLE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java
new file mode 100644
index 0000000..8882271
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IOCounterProc extends IOCounterCache<List<String>> {
+ private static final Logger LOGGER = LogManager.getLogger();
+ @SuppressWarnings("squid:S1075") // hardcoded URI
+ public static final File STATFILE = new File("/proc/self/io");
+ private long failureCount;
+
+ @Override
+ public long getReads() {
+ try {
+ return extractRow(getInfo(), 4);
+ } catch (Exception e) {
+ LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting reads", e);
+ return IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+ }
+ }
+
+ @Override
+ public long getWrites() {
+ try {
+ List<String> rows = getInfo();
+ long writes = extractRow(rows, 5);
+ long cancelledWrites = extractRow(rows, 6);
+ return writes - cancelledWrites;
+ } catch (Exception e) {
+ LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting writes", e);
+ return IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+ }
+ }
+
+ private long extractRow(List<String> rows, int rowIndex) {
+ return Long.parseLong(StringUtils.split(rows.get(rowIndex), ' ')[1]);
+ }
+
+ @Override
+ protected List<String> calculateInfo() throws IOException {
+ return FileUtils.readLines(STATFILE, Charset.defaultCharset());
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 2bcf414..c6696fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,7 +18,9 @@
*/
package org.apache.hyracks.control.nc.work;
+import java.util.ArrayDeque;
import java.util.Collection;
+import java.util.Deque;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -50,13 +52,14 @@
if (dpm == null) {
LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
}
+ Deque<Task> abortedTasks = new ArrayDeque<>();
Collection<Joblet> joblets = ncs.getJobletMap().values();
// TODO(mblow): should we have one jobletmap per cc?
joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
- Collection<Task> tasks = joblet.getTaskMap().values();
- for (Task task : tasks) {
+ joblet.getTaskMap().values().forEach(task -> {
task.abort();
- }
+ abortedTasks.add(task);
+ });
final JobId jobId = joblet.getJobId();
if (dpm != null) {
dpm.abortReader(jobId);
@@ -64,5 +67,6 @@
}
ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
});
+ ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, abortedTasks));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
new file mode 100644
index 0000000..156a5c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings({ "squid:S1181", "squid:S1166" })
+public class EnsureAllCcTasksCompleted implements Runnable {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
+ private final NodeControllerService ncs;
+ private final CcId ccId;
+ private final Deque<Task> abortedTasks;
+ private final Span span;
+
+ public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) {
+ this.ncs = ncs;
+ this.ccId = ccId;
+ this.abortedTasks = abortedTasks;
+ span = Span.start(2, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", ccId);
+ while (!span.elapsed()) {
+ removeAborted();
+ if (abortedTasks.isEmpty()) {
+ break;
+ }
+ LOGGER.log(Level.INFO, "{} tasks are still running", abortedTasks.size());
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a second
+ }
+ if (abortedTasks.isEmpty()) {
+ LOGGER.log(Level.INFO, "All tasks of {} have completed, Completing registration", ccId);
+ // all tasks has completed
+ ncs.getApplication().onRegisterNode(ccId);
+ } else {
+ LOGGER.log(Level.ERROR,
+ "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId,
+ TIMEOUT);
+ LOGGER.log(Level.ERROR, "{} tasks failed to complete within timeout", abortedTasks.size());
+ abortedTasks.forEach(task -> {
+ List<Thread> pendingThreads = task.getPendingThreads();
+ LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(),
+ pendingThreads.size());
+ pendingThreads.forEach(thread -> {
+ LOGGER.log(Level.ERROR, "Stuck thread trace: {}", Arrays.toString(thread.getStackTrace()));
+ });
+ });
+ ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ }
+ } catch (Throwable th) {
+ try {
+ LOGGER.log(Level.ERROR, "Failed to abort all previous tasks associated with CC {}", ccId, th);
+ } catch (Throwable ignore) {
+ // Ignore logging errors
+ }
+ ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ }
+ }
+
+ private void removeAborted() {
+ int numTasks = abortedTasks.size();
+ for (int i = 0; i < numTasks; i++) {
+ Task task = abortedTasks.poll();
+ if (!task.isCompleted()) {
+ abortedTasks.add(task);
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index db5cd13..1a17012 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,6 +33,7 @@
public static final int EC_ABNORMAL_TERMINATION = 1;
public static final int EC_FAILED_TO_STARTUP = 2;
public static final int EC_FAILED_TO_RECOVER = 3;
+ public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
public static final int EC_UNHANDLED_EXCEPTION = 11;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index 95db604..d8d6bb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -34,15 +34,19 @@
}
public boolean elapsed() {
- return remaining(TimeUnit.NANOSECONDS) > spanNanos;
+ return elapsed(TimeUnit.NANOSECONDS) > spanNanos;
}
- public long remaining(TimeUnit unit) {
+ public long elapsed(TimeUnit unit) {
return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
}
public void sleep(long sleep, TimeUnit unit) throws InterruptedException {
- TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+ TimeUnit.NANOSECONDS.sleep(Math.min(elapsed(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+ }
+
+ public long remaining(TimeUnit unit) {
+ return unit.convert(Long.max(spanNanos - elapsed(TimeUnit.NANOSECONDS), 0L), TimeUnit.NANOSECONDS);
}
public void loopUntilExhausted(ThrowingAction action) throws Exception {
@@ -52,7 +56,7 @@
public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception {
while (!elapsed()) {
action.run();
- if (remaining(delayUnit) < delay) {
+ if (elapsed(delayUnit) < delay) {
break;
}
delayUnit.sleep(delay);