[ASTERIXDB-2236][NET] Reestablish Nc2Nc Closed Connection
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Enable keep alive on Nc2Nc sockets.
- Open Nc2Nc connection when closed.
- Add "port" command to test framework to
ensure a port is not used anymore.
- Add test case for reestablishing connection
between nodes.
Change-Id: I3af4af37de6c1b36d7032c78ee60f1e5ca5f7c1c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2273
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b49bd8f..30336d1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,6 +32,7 @@
import java.io.StringWriter;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
@@ -1142,6 +1143,10 @@
command = stripJavaComments(statement).trim().split(" ");
executeStorageCommand(command);
break;
+ case "port":
+ command = stripJavaComments(statement).trim().split(" ");
+ handlePortCommand(command);
+ break;
default:
throw new IllegalArgumentException("No statements of type " + ctx.getType());
}
@@ -1748,6 +1753,31 @@
return replicationAddress.get(nodeId);
}
+ private void handlePortCommand(String[] command) throws InterruptedException, TimeoutException {
+ if (command.length != 3) {
+ throw new IllegalStateException("Unrecognized port command. Expected (host port timeout(sec))");
+ }
+ String host = command[0];
+ int port = Integer.parseInt(command[1]);
+ int timeoutSec = Integer.parseInt(command[2]);
+ while (isPortActive(host, port)) {
+ TimeUnit.SECONDS.sleep(1);
+ timeoutSec--;
+ if (timeoutSec <= 0) {
+ throw new TimeoutException(
+ "Port is still in use: " + host + ":" + port + " after " + command[2] + " secs");
+ }
+ }
+ }
+
+ private boolean isPortActive(String host, int port) {
+ try (Socket ignored = new Socket(host, port)) {
+ return true;
+ } catch (IOException ignored) {
+ return false;
+ }
+ }
+
abstract static class TestLoop extends Exception {
private final String target;
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NcLifecycleIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NcLifecycleIT.java
new file mode 100644
index 0000000..f69f849
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NcLifecycleIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.server.test;
+
+import static org.apache.asterix.server.test.NCServiceExecutionIT.APP_HOME;
+import static org.apache.asterix.server.test.NCServiceExecutionIT.ASTERIX_APP_DIR;
+import static org.apache.asterix.server.test.NCServiceExecutionIT.INSTANCE_DIR;
+import static org.apache.asterix.server.test.NCServiceExecutionIT.LOG_DIR;
+import static org.apache.asterix.server.test.NCServiceExecutionIT.TARGET_DIR;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.test.base.RetainLogsRule;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.server.process.HyracksVirtualCluster;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class NcLifecycleIT {
+
+ private static final String PATH_BASE =
+ FileUtil.joinPath("src", "test", "resources", "integrationts", "NcLifecycle");
+ private static final String CONF_DIR =
+ StringUtils.join(new String[] { TARGET_DIR, "test-classes", "NcLifecycleIT" }, File.separator);
+ private static final String PATH_ACTUAL = FileUtil.joinPath("target", "ittest");
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static String reportPath = new File(FileUtil.joinPath("target", "failsafe-reports")).getAbsolutePath();
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static HyracksVirtualCluster cluster;
+
+ private TestCaseContext tcCtx;
+
+ public NcLifecycleIT(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Rule public TestRule retainLogs = new RetainLogsRule(NCServiceExecutionIT.ASTERIX_APP_DIR, reportPath, this);
+
+ @Before
+ public void before() throws Exception {
+ LOGGER.info("Creating new instance...");
+ File instanceDir = new File(INSTANCE_DIR);
+ if (instanceDir.isDirectory()) {
+ FileUtils.deleteDirectory(instanceDir);
+ }
+
+ cluster = new HyracksVirtualCluster(new File(APP_HOME), new File(ASTERIX_APP_DIR));
+ cluster.addNCService(new File(CONF_DIR, "ncservice1.conf"), new File(LOG_DIR, "ncservice1.log"));
+ cluster.addNCService(new File(CONF_DIR, "ncservice2.conf"), new File(LOG_DIR, "ncservice2.log"));
+
+ // Start CC
+ cluster.start(new File(CONF_DIR, "cc.conf"), new File(LOG_DIR, "cc.log"));
+ LOGGER.info("Instance created.");
+ testExecutor.waitForClusterActive(30, TimeUnit.SECONDS);
+ LOGGER.info("Instance is in ACTIVE state.");
+ }
+
+ @After
+ public void after() {
+ LOGGER.info("Destroying instance...");
+ cluster.stop();
+ LOGGER.info("Instance destroyed.");
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false);
+ }
+
+ @Parameterized.Parameters(name = "NcLifecycleIT {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ if (testArgs.size() == 0) {
+ testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ }
+ return testArgs;
+ }
+
+ private static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+ }
+}
diff --git a/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/cc.conf
new file mode 100644
index 0000000..e28c41a
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/cc.conf
@@ -0,0 +1,48 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
+nc.api.port=19004
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
+iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
+nc.api.port=19005
+data.listen.port=12345
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.subdir=test_storage
+storage.memorycomponent.globalbudget = 1073741824
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+
+[common]
+log.level = INFO
diff --git a/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice1.conf b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice1.conf
new file mode 100644
index 0000000..ba10142
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice1.conf
@@ -0,0 +1,20 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[ncservice]
+logdir=../asterix-server/target/failsafe-reports
+
diff --git a/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice2.conf b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice2.conf
new file mode 100644
index 0000000..2036584
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/NcLifecycleIT/ncservice2.conf
@@ -0,0 +1,21 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[ncservice]
+logdir=../asterix-server/target/failsafe-reports
+port=9091
+
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.1.ddl.sqlpp b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.1.ddl.sqlpp
new file mode 100644
index 0000000..15bc3c5
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.1.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+CREATE TYPE MyType AS {
+ id : int
+};
+
+CREATE DATASET ds_1(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.2.query.sqlpp b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.2.query.sqlpp
new file mode 100644
index 0000000..ef836f4
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.2.query.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+select value count(*) from ds_1;
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.3.node.cmd b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.3.node.cmd
new file mode 100644
index 0000000..a04c093
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.3.node.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+kill asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.4.pollget.http b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.4.pollget.http
new file mode 100644
index 0000000..777e3dd
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.4.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=60
+
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.5.port.cmd b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.5.port.cmd
new file mode 100644
index 0000000..60acc69
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.5.port.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+localhost 12345 30
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.6.node.cmd b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.6.node.cmd
new file mode 100644
index 0000000..26a6503
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.6.node.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+start asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.7.pollget.http b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.7.pollget.http
new file mode 100644
index 0000000..777e3dd
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.7.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=60
+
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.8.ddl.sqlpp b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.8.ddl.sqlpp
new file mode 100644
index 0000000..f96d5a8
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.8.ddl.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+CREATE DATASET ds_2(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.9.query.sqlpp b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.9.query.sqlpp
new file mode 100644
index 0000000..4deaacf
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/queries/networking/reuse_data_port/reuse_data_port.9.query.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+select value count(*) from ds_2;
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.3.adm b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.3.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.3.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.5.adm b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.5.adm
new file mode 100644
index 0000000..8d5d123
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.5.adm
@@ -0,0 +1,38 @@
+{
+ "metadata_node" : "asterix_nc1",
+ "partitions" : {
+ "0" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc1",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 0,
+ "pendingActivation" : false
+ },
+ "1" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc1",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 1,
+ "pendingActivation" : false
+ },
+ "2" : {
+ "active" : false,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 2,
+ "pendingActivation" : false
+ },
+ "3" : {
+ "active" : false,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 3,
+ "pendingActivation" : false
+ }
+ },
+ "state" : "UNUSABLE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.7.adm b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.7.adm
new file mode 100644
index 0000000..c0697b7
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.7.adm
@@ -0,0 +1,38 @@
+{
+ "metadata_node" : "asterix_nc1",
+ "partitions" : {
+ "0" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc1",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 0,
+ "pendingActivation" : false
+ },
+ "1" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc1",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 1,
+ "pendingActivation" : false
+ },
+ "2" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 2,
+ "pendingActivation" : false
+ },
+ "3" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 3,
+ "pendingActivation" : false
+ }
+ },
+ "state" : "ACTIVE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.8.adm b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.8.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/results/networking/reuse_data_port/reuse_data_port.8.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/testsuite.xml b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/testsuite.xml
new file mode 100644
index 0000000..c1e5f14
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/integrationts/NcLifecycle/testsuite.xml
@@ -0,0 +1,27 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".sql">
+ <test-group name="networking">
+ <test-case FilePath="networking">
+ <compilation-unit name="reuse_data_port">
+ <output-dir compare="Text">reuse_data_port</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index f6261da..c12909c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -107,6 +107,13 @@
}
}
}
+
+ @Override
+ public void connectionClosed(TCPConnection connection) {
+ synchronized (MuxDemux.this) {
+ connectionMap.remove(connection.getRemoteAddress());
+ }
+ }
}, nThreads);
perfCounters = new MuxDemuxPerformanceCounters();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionListener.java
index b801cd3..afb78b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -22,9 +22,16 @@
import java.net.InetSocketAddress;
public interface ITCPConnectionListener {
- public void acceptedConnection(TCPConnection connection);
+ void acceptedConnection(TCPConnection connection);
- public void connectionEstablished(TCPConnection connection);
+ void connectionEstablished(TCPConnection connection);
- public void connectionFailure(InetSocketAddress remoteAddress, IOException error);
+ void connectionFailure(InetSocketAddress remoteAddress, IOException error);
+
+ /**
+ * Notifies the listener that {@code connection} has been closed.
+ *
+ * @param connection
+ */
+ void connectionClosed(TCPConnection connection);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index ce0bbbe..29afc6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -24,11 +24,17 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class TCPConnection {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
private final TCPEndpoint endpoint;
private final SocketChannel channel;
-
+ private final InetSocketAddress remoteAddress;
private final SelectionKey key;
private final Selector selector;
@@ -42,6 +48,7 @@
this.channel = channel;
this.key = key;
this.selector = selector;
+ remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
public TCPEndpoint getEndpoint() {
@@ -57,7 +64,7 @@
}
public InetSocketAddress getRemoteAddress() {
- return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+ return remoteAddress;
}
public void enable(int ops) {
@@ -86,12 +93,12 @@
this.attachment = attachment;
}
- public void close() {
+ public synchronized void close() {
key.cancel();
try {
channel.close();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error(() -> "Error closing channel at: " + remoteAddress, e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 52f4c30..4633cf3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -31,7 +31,13 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class TCPEndpoint {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
private final ITCPConnectionListener connectionListener;
private final int nThreads;
@@ -107,10 +113,10 @@
super("TCPEndpoint IO Thread");
setDaemon(true);
setPriority(Thread.NORM_PRIORITY);
- this.pendingConnections = new ArrayList<InetSocketAddress>();
- this.workingPendingConnections = new ArrayList<InetSocketAddress>();
- this.incomingConnections = new ArrayList<SocketChannel>();
- this.workingIncomingConnections = new ArrayList<SocketChannel>();
+ this.pendingConnections = new ArrayList<>();
+ this.workingPendingConnections = new ArrayList<>();
+ this.incomingConnections = new ArrayList<>();
+ this.workingIncomingConnections = new ArrayList<>();
selector = Selector.open();
}
@@ -124,6 +130,7 @@
for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
+ channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.configureBlocking(false);
boolean connect = false;
boolean failure = false;
@@ -150,6 +157,7 @@
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel : workingIncomingConnections) {
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
+ channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.configureBlocking(false);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
@@ -174,8 +182,10 @@
try {
connection.getEventListener().notifyIOReady(connection, readable, writable);
} catch (Exception e) {
+ LOGGER.error("Unexpected tcp io error", e);
connection.getEventListener().notifyIOError(e);
connection.close();
+ connectionListener.connectionClosed(connection);
continue;
}
}
@@ -201,7 +211,7 @@
}
}
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("Error in TCPEndpoint " + localAddress, e);
}
}
}