Merge branch 'gerrit/march-hare'
Change-Id: Icc16af567564affc4a6547e221131ed46481facc
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 336839a..13027eca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -87,7 +87,6 @@
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
@@ -114,7 +113,6 @@
private static final Logger LOGGER = LogManager.getLogger();
private static IAsterixStateProxy proxy;
- protected ICCServiceContext ccServiceCtx;
protected CCExtensionManager ccExtensionManager;
protected IStorageComponentProvider componentProvider;
protected WebManager webManager;
@@ -125,7 +123,6 @@
@Override
public void init(IServiceContext serviceCtx) throws Exception {
super.init(serviceCtx);
- ccServiceCtx = (ICCServiceContext) serviceCtx;
ccServiceCtx.setThreadFactory(
new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
validateEnvironment();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index eba2049..2e5c09c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -72,7 +72,6 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
-import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
@@ -94,8 +93,6 @@
public class NCApplication extends BaseNCApplication {
private static final Logger LOGGER = LogManager.getLogger();
-
- protected INCServiceContext ncServiceCtx;
protected NCExtensionManager ncExtensionManager;
private INcApplicationContext runtimeContext;
private String nodeId;
@@ -111,7 +108,7 @@
@Override
public void init(IServiceContext serviceCtx) throws Exception {
- ncServiceCtx = (INCServiceContext) serviceCtx;
+ super.init(serviceCtx);
configureLoggingLevel(ncServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL));
// set the node status initially to idle to indicate that it is pending booting
((NodeControllerService) serviceCtx.getControllerService()).setNodeStatus(NodeStatus.IDLE);
@@ -240,6 +237,7 @@
LOGGER.info("Duplicate attempt to stop ignored: " + nodeId);
}
}
+ super.stop();
}
@Override
@@ -249,6 +247,7 @@
@Override
public synchronized void startupCompleted() throws Exception {
+ super.startupCompleted();
// configure servlets after joining the cluster, so we can create HyracksClientConnection
configureServers();
webManager.start();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 4837a98..df16daf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -426,6 +426,7 @@
public void stop() throws Exception {
// ungraceful shutdown
webManager.stop();
+ super.stop();
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/quantifiers/query-ASTERIXDB-2696.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/quantifiers/query-ASTERIXDB-2696.sqlpp
new file mode 100644
index 0000000..3f2988a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/quantifiers/query-ASTERIXDB-2696.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from [
+ { "a": 1, "b": [ 10, 20, 30] },
+ { "a": 1, "b": [ 40, 50, 60] },
+ { "a": 1, "b": [ 70, 80, 90] },
+ { "a": 2, "b": [ 100, 200, 300] },
+ { "a": 2, "b": [ 400, 500, 600] },
+ { "a": 2, "b": [ 700, 800, 900] }
+] t
+group by a
+select a, sum( case when some x in b satisfies x >= 50 then 1 else 0 end) as s;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/quantifiers/query-ASTERIXDB-2696.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/quantifiers/query-ASTERIXDB-2696.plan
new file mode 100644
index 0000000..26b18be
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/quantifiers/query-ASTERIXDB-2696.plan
@@ -0,0 +1,25 @@
+-- DISTRIBUTE_RESULT |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- PRE_CLUSTERED_GROUP_BY[$$67] |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- AGGREGATE |LOCAL|
+ -- SUBPLAN |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- UNNEST |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$67(ASC)] |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.query.sqlpp
new file mode 100644
index 0000000..3f2988a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from [
+ { "a": 1, "b": [ 10, 20, 30] },
+ { "a": 1, "b": [ 40, 50, 60] },
+ { "a": 1, "b": [ 70, 80, 90] },
+ { "a": 2, "b": [ 100, 200, 300] },
+ { "a": 2, "b": [ 400, 500, 600] },
+ { "a": 2, "b": [ 700, 800, 900] }
+] t
+group by a
+select a, sum( case when some x in b satisfies x >= 50 then 1 else 0 end) as s;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.adm
new file mode 100644
index 0000000..fcf5cee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/quantifiers/query-ASTERIXDB-2696/query-ASTERIXDB-2696.1.adm
@@ -0,0 +1,2 @@
+{ "a": 1, "s": 2 }
+{ "a": 2, "s": 3 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2045fb8..aa5fc5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8332,6 +8332,11 @@
<output-dir compare="Text">query-ASTERIXDB-2307</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="quantifiers">
+ <compilation-unit name="query-ASTERIXDB-2696">
+ <output-dir compare="Text">query-ASTERIXDB-2696</output-dir>
+ </compilation-unit>
+ </test-case>
<!--
<test-case FilePath="quantifiers">
<compilation-unit name="everysat_02">
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java
index 3840a00..cec0de7 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java
@@ -88,7 +88,6 @@
deepSelectiveCopy(child, destChild, filter);
} else if (filter.accept(child)) {
FileUtil.safeCopyFile(child, destChild);
- return;
}
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index f2d2496..3226299 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -106,12 +106,9 @@
}
private void establishReplicaConnection() throws IOException {
- try {
- sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
- } catch (Exception e) {
- // try to re-resolve the address, in case our replica has had his IP address updated
- sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.refreshLocation());
- }
+ // try to re-resolve the address, in case our replica has had his IP address updated, and that is why the
+ // connection is unhealthy...
+ sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.refreshLocation());
}
public synchronized void close() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index 782a801..d803756 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -94,13 +94,10 @@
}
protected void establishReplicaConnection(INcApplicationContext appCtx) throws IOException {
- try {
- logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
- } catch (Exception e) {
- // try to re-resolve the address, in case our replica has had his IP address updated
- resolvedLocation = NetworkUtil.refresh(resolvedLocation);
- logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
- }
+ // try to re-resolve the address, in case our replica has had his IP address updated, and that is why
+ // the connection is unhealthy...
+ resolvedLocation = NetworkUtil.refresh(resolvedLocation);
+ logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation);
}
private synchronized void closeLogReplicationChannel() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 64d1b61..54cbcd0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -147,7 +148,7 @@
@Override
public Long visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
- return ONE;
+ return op.getDataSourceReference().getValue().getOperatorTag() == LogicalOperatorTag.SUBPLAN ? ONE : UNKNOWN;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index 018f9fe..c92c677 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -25,6 +25,8 @@
import org.apache.hyracks.api.network.INetworkSecurityManager;
public interface IControllerService {
+ String getId();
+
void start() throws Exception;
void stop() throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ffd2956..5d52ed9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -53,7 +53,7 @@
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
*/
public static void doUninterruptibly(InterruptibleAction interruptible) {
- boolean interrupted = false;
+ boolean interrupted = Thread.interrupted();
try {
while (true) {
try {
@@ -75,7 +75,7 @@
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
*/
public static void doExUninterruptibly(ThrowingAction interruptible) throws Exception {
- boolean interrupted = false;
+ boolean interrupted = Thread.interrupted();
try {
while (true) {
try {
@@ -98,7 +98,7 @@
* @return true if the original operation was interrupted, otherwise false
*/
public static boolean doUninterruptiblyGet(InterruptibleAction interruptible) {
- boolean interrupted = false;
+ boolean interrupted = Thread.interrupted();
while (true) {
try {
interruptible.run();
@@ -117,7 +117,7 @@
* @return true if the original operation was interrupted, otherwise false
*/
public static boolean doExUninterruptiblyGet(Callable<Void> interruptible) throws Exception {
- boolean interrupted = false;
+ boolean interrupted = Thread.interrupted();
boolean success = false;
while (true) {
try {
@@ -168,7 +168,7 @@
* the original operation was interrupted.
*/
public static void doIoUninterruptibly(IOInterruptibleAction interruptible) throws IOException {
- boolean interrupted = false;
+ boolean interrupted = Thread.interrupted();
try {
while (true) {
try {
@@ -177,6 +177,7 @@
} catch (ClosedByInterruptException | InterruptedException e) {
LOGGER.error("IO operation Interrupted. Retrying..", e);
interrupted = true;
+ //noinspection ResultOfMethodCallIgnored
Thread.interrupted();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index b41d5a2..efd42c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -21,6 +21,7 @@
import java.util.Arrays;
import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
@@ -29,23 +30,27 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.control.common.ControllerShutdownHook;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.LoggingConfigUtil;
import org.apache.logging.log4j.Level;
public class BaseCCApplication implements ICCApplication {
public static final ICCApplication INSTANCE = new BaseCCApplication();
+ protected ICCServiceContext ccServiceCtx;
private IConfigManager configManager;
+ private Thread shutdownHook;
protected BaseCCApplication() {
}
@Override
public void init(IServiceContext serviceCtx) throws Exception {
- // no-op
+ ccServiceCtx = (ICCServiceContext) serviceCtx;
}
@Override
@@ -56,13 +61,28 @@
}
@Override
- public void stop() throws Exception {
- // no-op
+ public void startupCompleted() throws Exception {
+ installShutdownHook();
}
@Override
- public void startupCompleted() throws Exception {
- // no-op
+ public void stop() throws Exception {
+ uninstallShutdownHook();
+ }
+
+ protected Thread createShutdownHook() {
+ return new ControllerShutdownHook(ccServiceCtx);
+ }
+
+ protected void installShutdownHook() {
+ shutdownHook = createShutdownHook();
+ ExitUtil.registerShutdownHook(shutdownHook);
+ }
+
+ protected void uninstallShutdownHook() {
+ if (shutdownHook != null) {
+ ExitUtil.unregisterShutdownHook(shutdownHook);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 1e92f16..f11e7ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -209,11 +209,17 @@
}
@Override
+ public String getId() {
+ return "ClusterControllerService";
+ }
+
+ @Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
IIPCI ccIPCI = new ClusterControllerIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()),
+ clusterIPC = new IPCSystem(
+ new InetSocketAddress(ccConfig.getClusterListenAddress(), ccConfig.getClusterListenPort()),
networkSecurityManager.getSocketChannelFactory(), ccIPCI, new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
clientIPC =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index 194d27f..9c504ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -69,7 +69,9 @@
*/
nodeManager.apply(this::shutdownNode);
- ccs.getExecutor().execute(() -> {
+ // complete the rest of the tasks in a separate standalone thread, to better allow our worker & executor
+ // queues to drain during shutdown
+ Thread finalWork = new Thread(() -> {
try {
/*
* wait for all our acks
@@ -88,8 +90,11 @@
ExitUtil.exit(cleanShutdown ? EC_NORMAL_TERMINATION : EC_ABNORMAL_TERMINATION);
} catch (Exception e) {
callback.setException(e);
+ } finally {
+ shutdownStatus.notifyCcStopComplete();
}
- });
+ }, getClass().getSimpleName() + "-Helper");
+ finalWork.start();
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java
similarity index 73%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java
index ffa02d6..806e42e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java
@@ -16,26 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc;
+package org.apache.hyracks.control.common;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
- * Shutdown hook that invokes {@link NodeControllerService#stop() stop} method.
+ * Shutdown hook that invokes {@link IControllerService#stop() stop} method.
* This shutdown hook must have a failsafe mechanism to halt the process in case the shutdown
* operation is hanging for any reason
*/
-public class NCShutdownHook extends Thread {
+public class ControllerShutdownHook extends Thread {
private static final Logger LOGGER = LogManager.getLogger();
- private final NodeControllerService nodeControllerService;
+ private final IControllerService controllerService;
- NCShutdownHook(NodeControllerService nodeControllerService) {
- super("ShutdownHook-" + nodeControllerService.getId());
- this.nodeControllerService = nodeControllerService;
+ public ControllerShutdownHook(IServiceContext serviceCtx) {
+ super("ShutdownHook-" + serviceCtx.getControllerService().getId());
+ this.controllerService = serviceCtx.getControllerService();
}
@Override
@@ -46,7 +48,7 @@
} catch (Throwable th) {//NOSONAR
}
LOGGER.log(Level.DEBUG, () -> "Thread dump at shutdown: " + ThreadDumpUtil.takeDumpString());
- nodeControllerService.stop();
+ controllerService.stop();
} catch (Throwable th) { // NOSONAR... This is fine since this is shutdown hook
LOGGER.log(Level.WARN, "Exception in executing shutdown hook", th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 4f35aca..0de75d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -247,9 +247,6 @@
return configManager.toIni(false);
}
- // QQQ Note that clusterListenAddress is *not directly used* yet. Both
- // the cluster listener and the web server listen on "all interfaces".
- // This IP address is only used to instruct the NC on which IP to call in.
public String getClusterListenAddress() {
return getAppConfig().getString(Option.CLUSTER_LISTEN_ADDRESS);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
index 26351e2..35d6393 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
@@ -23,6 +23,5 @@
* @return true if all nodes ack shutdown
* @throws Exception
*/
- public boolean waitForCompletion() throws Exception;
-
+ boolean waitForCompletion() throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index e210963..0a54d1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -24,11 +24,13 @@
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.hyracks.util.Span;
+
public class ShutdownRun implements IShutdownStatusConditionVariable {
private final Set<String> shutdownNodeIds = new TreeSet<>();
- private boolean shutdownSuccess = false;
- private static final long SHUTDOWN_TIMER_MS = TimeUnit.SECONDS.toMillis(30);
+ private boolean ccStopComplete = false;
+ private static final long SHUTDOWN_TIMEOUT_SECONDS = 60;
public ShutdownRun(Collection<String> nodeIds) {
shutdownNodeIds.addAll(nodeIds);
@@ -42,26 +44,38 @@
public synchronized void notifyShutdown(String nodeId) {
shutdownNodeIds.remove(nodeId);
if (shutdownNodeIds.isEmpty()) {
- shutdownSuccess = true;
notifyAll();
}
}
@Override
public synchronized boolean waitForCompletion() throws Exception {
- if (shutdownNodeIds.isEmpty()) {
- shutdownSuccess = true;
- } else {
+ Span span = Span.start(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ while (!span.elapsed()) {
+ if (shutdownNodeIds.isEmpty()) {
+ return true;
+ }
/*
- * Either be woken up when we're done, or default to fail.
+ * Either be woken up when we're done, or after (remaining) timeout has elapsed
*/
- wait(SHUTDOWN_TIMER_MS);
+ span.wait(this);
}
- return shutdownSuccess;
+ return false;
}
public synchronized Set<String> getRemainingNodes() {
return shutdownNodeIds;
}
+ public synchronized void notifyCcStopComplete() {
+ ccStopComplete = true;
+ notifyAll();
+ }
+
+ public synchronized boolean waitForCcStopCompletion() throws Exception {
+ while (!ccStopComplete) {
+ wait();
+ }
+ return true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 882c396..b020cde 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -22,6 +22,7 @@
import java.util.Arrays;
import org.apache.hyracks.api.application.INCApplication;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
@@ -29,24 +30,28 @@
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.control.common.ControllerShutdownHook;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.LoggingConfigUtil;
import org.apache.logging.log4j.Level;
public class BaseNCApplication implements INCApplication {
public static final BaseNCApplication INSTANCE = new BaseNCApplication();
+ protected INCServiceContext ncServiceCtx;
private ConfigManager configManager;
+ private Thread shutdownHook;
protected BaseNCApplication() {
}
@Override
public void init(IServiceContext serviceCtx) throws Exception {
- // no-op
+ ncServiceCtx = (INCServiceContext) serviceCtx;
}
@Override
@@ -58,7 +63,7 @@
@Override
public void startupCompleted() throws Exception {
- // no-op
+ installShutdownHook();
}
@Override
@@ -68,7 +73,7 @@
@Override
public void stop() throws Exception {
- // no-op
+ uninstallShutdownHook();
}
@Override
@@ -76,6 +81,21 @@
// no-op
}
+ protected Thread createShutdownHook() {
+ return new ControllerShutdownHook(ncServiceCtx);
+ }
+
+ protected void installShutdownHook() {
+ shutdownHook = createShutdownHook();
+ ExitUtil.registerShutdownHook(shutdownHook);
+ }
+
+ protected void uninstallShutdownHook() {
+ if (shutdownHook != null) {
+ ExitUtil.unregisterShutdownHook(shutdownHook);
+ }
+ }
+
@Override
public NodeCapacity getCapacity() {
int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index ac2ef83..bab729f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -187,8 +187,6 @@
ExitUtil.init();
}
- private NCShutdownHook ncShutdownHook;
-
public NodeControllerService(NCConfig config) throws Exception {
this(config, getApplication(config));
}
@@ -210,9 +208,6 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
}
- // Set shutdown hook before so it doesn't have the same uncaught exception handler
- ncShutdownHook = new NCShutdownHook(this);
- Runtime.getRuntime().addShutdownHook(ncShutdownHook);
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize());
@@ -488,54 +483,47 @@
@Override
public synchronized void stop() throws Exception {
- if (shutdownCallStack == null) {
- shutdownCallStack = new Throwable().getStackTrace();
- LOGGER.log(Level.INFO, "Stopping NodeControllerService");
- application.preStop();
- executor.shutdownNow();
- if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
- LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown");
- }
- partitionManager.close();
- resultPartitionManager.close();
- netManager.stop();
- resultNetworkManager.stop();
- if (messagingNetManager != null) {
- messagingNetManager.stop();
- }
- workQueue.stop();
- application.stop();
- /*
- * Stop heartbeats only after NC has stopped to avoid false node failure detection
- * on CC if an NC takes a long time to stop.
- */
- heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown);
- synchronized (ccLock) {
- ccMap.values().parallelStream().forEach(cc -> {
- try {
- cc.getClusterControllerService().notifyShutdown(id);
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
- }
- });
- }
- ipc.stop();
- ioManager.close();
- LOGGER.log(Level.INFO, "Stopped NodeControllerService");
- } else {
- LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
+ if (shutdownCallStack != null) {
+ LOGGER.error("Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
new Exception("Duplicate shutdown call"));
+ return;
}
- if (ncShutdownHook != null) {
- try {
- Runtime.getRuntime().removeShutdownHook(ncShutdownHook);
- LOGGER.info("removed shutdown hook for {}", id);
- } catch (IllegalStateException e) {
- LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e);
- }
+ shutdownCallStack = new Throwable().getStackTrace();
+ LOGGER.info("Stopping NodeControllerService");
+ application.preStop();
+ executor.shutdownNow();
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown");
}
+ partitionManager.close();
+ resultPartitionManager.close();
+ netManager.stop();
+ resultNetworkManager.stop();
+ if (messagingNetManager != null) {
+ messagingNetManager.stop();
+ }
+ workQueue.stop();
+ application.stop();
+ /*
+ * Stop heartbeats only after NC has stopped to avoid false node failure detection
+ * on CC if an NC takes a long time to stop.
+ */
+ heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown);
+ synchronized (ccLock) {
+ ccMap.values().parallelStream().forEach(cc -> {
+ try {
+ cc.getClusterControllerService().notifyShutdown(id);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+ }
+ });
+ }
+ ipc.stop();
+ ioManager.close();
+ LOGGER.info("Stopped NodeControllerService");
}
+ @Override
public String getId() {
return id;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 58aa39e..eaae8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -37,12 +37,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.NetworkUtil;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,6 +54,7 @@
// TODO(mblow): the next two could be config parameters
private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+ private static final int MAX_STOP_JOIN_WAIT_MILLIS = 30000;
private final IPCSystem system;
@@ -107,6 +110,11 @@
stopped = true;
NetworkUtil.closeQuietly(serverSocketChannel);
networkThread.selector.wakeup();
+ InvokeUtil.doUninterruptibly(() -> networkThread.join(MAX_STOP_JOIN_WAIT_MILLIS));
+ if (networkThread.isAlive()) {
+ LOGGER.warn("giving up after waiting {}s for networkThread to exit",
+ TimeUnit.MILLISECONDS.toSeconds(MAX_STOP_JOIN_WAIT_MILLIS));
+ }
}
IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException {
@@ -154,7 +162,14 @@
}
}
- synchronized void write(Message msg) {
+ synchronized void send(Message msg) throws IPCException {
+ if (stopped) {
+ throw new IPCException("ipc system has been stopped");
+ }
+ write(msg);
+ }
+
+ private synchronized void write(Message msg) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Enqueued message: " + msg);
}
@@ -216,9 +231,24 @@
processSelectedKeys();
}
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Exception processing message", e);
+ LOGGER.error("Exception processing message", e);
}
}
+ // process any last work we accepted prior to being stopped, before we terminate
+ collectOutstandingWork();
+ LOGGER.trace("had {} pending messages at stop time!", workingSendList.size());
+ if (!workingSendList.isEmpty()) {
+ sendPendingMessages();
+ }
+ try {
+ int n = selector.selectNow();
+ LOGGER.trace("had {} keys remaining at stop time!", n);
+ if (n > 0) {
+ processSelectedKeys();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception processing message", e);
+ }
}
private void processSelectedKeys() {
@@ -340,6 +370,7 @@
IPCHandle handle = msg.getIPCHandle();
if (handle.getState() == HandleState.CLOSED) {
// message will never be sent
+ LOGGER.info("Could not send message: {}, due to {}", msg, handle);
return true;
}
if (handle.full()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 25abfe1..ddcc677 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -81,7 +81,7 @@
msg.setFlag(Message.NORMAL);
msg.setPayload(req);
}
- system.getConnectionManager().write(msg);
+ system.getConnectionManager().send(msg);
return mid;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 680d55e..f8bc9f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -104,6 +104,28 @@
Runtime.getRuntime().halt(status);
}
+ public static boolean registerShutdownHook(Thread shutdownHook) {
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ LOGGER.info("successfully registered shutdown hook {}", shutdownHook);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("unable to register shutdown hook {}", shutdownHook, e);
+ return false;
+ }
+ }
+
+ public static boolean unregisterShutdownHook(Thread shutdownHook) {
+ try {
+ boolean success = Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ LOGGER.info("{}successfully removed shutdown hook {}", success ? "" : "un", shutdownHook);
+ return success;
+ } catch (IllegalStateException e) {
+ LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e);
+ return false;
+ }
+ }
+
private static class ShutdownWatchdog extends Thread {
private final Semaphore startSemaphore = new Semaphore(0);