[ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Currently there is a chance that a MaterializingPipelinedPartition
is deallocated before the consuming thread starts (e.g. due to job
abort) and therefore the consuming thread will not be interrupted
leading to leaked threads and files. This change checks if partition
was deallocated before the consumer starts, then the consumer thread
cleans up any files then exits.
- Make TaskAttemptId non-final class to mock it.
- Add test case.
Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2143
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index b039071..c162e14 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -577,5 +577,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 03f42f5..d80eabc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -189,7 +189,7 @@
return num;
}
- private static void checkThreadLeaks() throws IOException {
+ public static void checkThreadLeaks() throws IOException {
String threadDump = ThreadDumpUtil.takeDumpJSONString();
// Currently we only do sanity check for threads used in the execution engine.
// Later we should check if there are leaked storage threads as well.
@@ -200,7 +200,7 @@
}
}
- private static void checkOpenRunFileLeaks() throws IOException {
+ public static void checkOpenRunFileLeaks() throws IOException {
if (SystemUtils.IS_OS_WINDOWS) {
return;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
new file mode 100644
index 0000000..5ee0e9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DeallocatableTest {
+
+ @Before
+ public void setUp() throws Exception {
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Test
+ public void deallocateBeforeConsumerStart() throws Exception {
+ TestNodeController nc = new TestNodeController(null, false);
+ try {
+ nc.init();
+ final NodeControllerService ncs =
+ (NodeControllerService) nc.getAppRuntimeContext().getServiceContext().getControllerService();
+ final TaskAttemptId taId = Mockito.mock(TaskAttemptId.class);
+ final IHyracksTaskContext ctx = nc.createTestContext(true);
+ final ConnectorDescriptorId codId = new ConnectorDescriptorId(1);
+ final PartitionId pid = new PartitionId(ctx.getJobletContext().getJobId(), codId, 1, 1);
+ final ChannelControlBlock ccb = ncs.getNetworkManager()
+ .connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress()));
+ final NetworkOutputChannel networkOutputChannel = new NetworkOutputChannel(ccb, 0);
+ final MaterializingPipelinedPartition mpp =
+ new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), pid, taId, ncs.getExecutor());
+ mpp.open();
+ // fill and write frame
+ final ByteBuffer frame = ctx.allocateFrame();
+ while (frame.hasRemaining()) {
+ frame.put((byte) 0);
+ }
+ frame.flip();
+ mpp.nextFrame(frame);
+ // close and deallocate before consumer thread starts
+ mpp.close();
+ mpp.deallocate();
+ // start the consumer thread after deallocate
+ mpp.writeTo(networkOutputChannel);
+ // give consumer thread chance to exit
+ TimeUnit.MILLISECONDS.sleep(100);
+ LangExecutionUtil.checkThreadLeaks();
+ LangExecutionUtil.checkOpenRunFileLeaks();
+ } finally {
+ nc.deInit();
+ }
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 62c1e4a..c93920f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -23,13 +23,17 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
+import java.net.SocketAddress;
import java.net.SocketException;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.util.Enumeration;
+import org.apache.hyracks.api.comm.NetworkAddress;
+
public class NetworkingUtil {
private NetworkingUtil() {
@@ -119,4 +123,8 @@
int port = socketChannel.socket().getPort();
return InetSocketAddress.createUnresolved(hostAddress, port);
}
+
+ public static SocketAddress getSocketAddress(NetworkAddress netAddr) throws UnknownHostException {
+ return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
index 782561b..bb3b3c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.api.io.IWritable;
-public final class TaskAttemptId implements IWritable, Serializable {
+public class TaskAttemptId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
private TaskId taskId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 5506a94..3582da2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -38,31 +38,19 @@
private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
private final IHyracksTaskContext ctx;
-
private final Executor executor;
-
private final IIOManager ioManager;
-
private final PartitionManager manager;
-
private final PartitionId pid;
-
private final TaskAttemptId taId;
-
private FileReference fRef;
-
private IFileHandle writeHandle;
-
private long size;
-
private boolean eos;
-
private boolean failed;
-
protected boolean flushRequest;
-
+ private boolean deallocated;
private Level openCloseLevel = Level.FINE;
-
private Thread dataConsumerThread;
public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
@@ -89,6 +77,7 @@
if (dataConsumerThread != null) {
dataConsumerThread.interrupt();
}
+ deallocated = true;
}
@Override
@@ -109,14 +98,19 @@
fRefCopy = fRef;
}
writer.open();
- IFileHandle readHandle = fRefCopy == null ? null
- : ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ IFileHandle readHandle = fRefCopy == null ? null :
+ ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
try {
if (readHandle == null) {
// Either fail() is called or close() is called with 0 tuples coming in.
return;
}
+ synchronized (MaterializingPipelinedPartition.this) {
+ if (deallocated) {
+ return;
+ }
+ }
long offset = 0;
ByteBuffer buffer = ctx.allocateFrame();
boolean done = false;
@@ -192,6 +186,7 @@
size = 0;
eos = false;
failed = false;
+ deallocated = false;
manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
}