[NO ISSUE][STO] Move the IO threads from BufferCache to IOManager
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Move the IO threads from BufferCache to IOManager to cover all
IO uses that go through the IOManager.
Change-Id: Ic02b456826ae7abc2619a7eec3f90b48717b0adb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2417
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 4d26d25..6c4d068 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -429,7 +429,7 @@
public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging)
throws HyracksDataException {
- IHyracksTaskContext ctx = TestUtils.create(KB32);
+ IHyracksTaskContext ctx = TestUtils.create(KB32, ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
if (withMessaging) {
TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 25c71df..306a2a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -32,9 +32,9 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -178,7 +178,7 @@
}
private Throwable finish(Throwable failure) {
- Throwable th = ResourceReleaseUtils.close(recordReader, null);
+ Throwable th = CleanupUtils.close(recordReader, null);
th = DataflowUtils.close(tupleForwarder, th);
closeSignal();
setState(State.STOPPED);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 471d23f..fb9a4a6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -25,7 +25,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -103,11 +103,11 @@
@Override
public synchronized void close() throws IOException {
- Throwable failure = ResourceReleaseUtils.close(connectionStream, null);
+ Throwable failure = CleanupUtils.close(connectionStream, null);
connectionStream = null;
- failure = ResourceReleaseUtils.close(socket, failure);
+ failure = CleanupUtils.close(socket, failure);
socket = null;
- failure = ResourceReleaseUtils.close(server, failure);
+ failure = CleanupUtils.close(server, failure);
server = null;
if (failure != null) {
throw HyracksDataException.create(failure);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
similarity index 80%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
index 3de2ca1..3efb2e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java
@@ -18,10 +18,9 @@
*/
package org.apache.hyracks.api.io;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+@FunctionalInterface
+public interface IAsyncRequest {
-public interface IIOFuture {
- public int synchronize() throws HyracksDataException, InterruptedException;
+ void await() throws InterruptedException;
- public boolean isComplete();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index b0cc07a..ff1e47f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -18,13 +18,13 @@
*/
package org.apache.hyracks.api.io;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.Executor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public interface IIOManager {
+public interface IIOManager extends Closeable {
public enum FileReadWriteMode {
READ_ONLY,
READ_WRITE
@@ -47,16 +47,16 @@
public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
- public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data);
+ IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
- public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data);
+ IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
+
+ IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
public void close(IFileHandle fHandle) throws HyracksDataException;
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
- public void setExecutor(Executor executor);
-
public long getSize(IFileHandle fileHandle);
public void deleteWorkspaceFiles() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index a67c133..008cffd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -41,8 +41,8 @@
} catch (Throwable th) { // NOSONAR. Had to be done to satisfy contracts
try {
LOGGER.log(Level.WARN, "Failure destroying a destroyable resource", th);
- } catch (Throwable ignore) {
- // Do nothing
+ } catch (Throwable ignore) { // NOSONAR: Ignore catching Throwable
+ // NOSONAR Ignore logging failure
}
root = ExceptionUtils.suppress(root, th);
}
@@ -66,11 +66,11 @@
if (writer != null) {
try {
writer.close();
- } catch (Throwable th) { // NOSONAR Will be re-thrown
+ } catch (Throwable th) { // NOSONAR Will be suppressed
try {
LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
- } catch (Throwable loggingFailure) {
- // Do nothing
+ } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+ // NOSONAR: Ignore logging failure
}
root = ExceptionUtils.suppress(root, th);
}
@@ -90,13 +90,39 @@
public static void fail(IFrameWriter writer, Throwable root) {
try {
writer.fail();
- } catch (Throwable th) { // NOSONAR Will be re-thrown
+ } catch (Throwable th) { // NOSONAR Will be suppressed
try {
LOGGER.log(Level.WARN, "Failure failing " + writer.getClass().getSimpleName(), th);
- } catch (Throwable loggingFailure) {
- // Do nothing
+ } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+ // NOSONAR ignore logging failure
}
root.addSuppressed(th);
}
}
+
+ /**
+ * Close the AutoCloseable and suppress any Throwable thrown by the close call.
+ * This method must NEVER throw any Throwable
+ *
+ * @param closable
+ * the resource to close
+ * @param root
+ * the first exception encountered during release of resources
+ * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+ */
+ public static Throwable close(AutoCloseable closable, Throwable root) {
+ if (closable != null) {
+ try {
+ closable.close();
+ } catch (Throwable th) { // NOSONAR Will be suppressed
+ try {
+ LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
+ } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+ // NOSONAR ignore logging failure
+ }
+ root = ExceptionUtils.suppress(root, th); // NOSONAR
+ }
+ }
+ return root;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0ccef1d..0b7254d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -63,6 +63,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.config.ConfigManager;
@@ -218,21 +219,26 @@
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
ioManager =
new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
-
- workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
- jobletMap = new ConcurrentHashMap<>();
- deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
- timer = new Timer(true);
- serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
- new File(new File(NodeControllerService.class.getName()), id));
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
- threadMXBean = ManagementFactory.getThreadMXBean();
- runtimeMXBean = ManagementFactory.getRuntimeMXBean();
- osMXBean = ManagementFactory.getOperatingSystemMXBean();
- getNodeControllerInfosAcceptor = new MutableObject<>();
- memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
- ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
+ try {
+ workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
+ jobletMap = new ConcurrentHashMap<>();
+ deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
+ timer = new Timer(true);
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
+ new File(new File(NodeControllerService.class.getName()), id));
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ threadMXBean = ManagementFactory.getThreadMXBean();
+ runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ getNodeControllerInfosAcceptor = new MutableObject<>();
+ memoryManager =
+ new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+ ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
+ } catch (Throwable th) { // NOSONAR will be re-thrown
+ CleanupUtils.close(ioManager, th);
+ throw th;
+ }
}
public IOManager getIoManager() {
@@ -271,7 +277,6 @@
}
private void init() throws Exception {
- ioManager.setExecutor(executor);
datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
@@ -526,7 +531,7 @@
});
}
ipc.stop();
-
+ ioManager.close();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
} else {
LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 2742aaa..a527e92 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -30,43 +30,49 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOFuture;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IoRequest.State;
import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class IOManager implements IIOManager {
/*
* Constants
*/
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable
private static final String WORKSPACE_FILE_SUFFIX = ".waf";
private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX);
/*
* Finals
*/
+ private final ExecutorService executor;
+ private final BlockingQueue<IoRequest> submittedRequests;
+ private final BlockingQueue<IoRequest> freeRequests;
private final List<IODeviceHandle> ioDevices;
private final List<IODeviceHandle> workspaces;
/*
* Mutables
*/
- private Executor executor;
private int workspaceIndex;
private IFileDeviceResolver deviceComputer;
- public IOManager(List<IODeviceHandle> devices, Executor executor, IFileDeviceResolver deviceComputer)
- throws HyracksDataException {
- this(devices, deviceComputer);
- this.executor = executor;
- }
-
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
this.ioDevices = Collections.unmodifiableList(devices);
checkDeviceValidity(devices);
@@ -86,6 +92,21 @@
}
workspaceIndex = 0;
this.deviceComputer = deviceComputer;
+ submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+ freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+ int numIoThreads = ioDevices.size() * 2;
+ executor = Executors.newFixedThreadPool(numIoThreads);
+ for (int i = 0; i < numIoThreads; i++) {
+ executor.execute(new IoRequestHandler(i, submittedRequests));
+ }
+ }
+
+ public IoRequest getOrAllocRequest() {
+ IoRequest request = freeRequests.poll();
+ if (request == null) {
+ request = new IoRequest(this, submittedRequests, freeRequests);
+ }
+ return request;
}
private void checkDeviceValidity(List<IODeviceHandle> devices) throws HyracksDataException {
@@ -106,11 +127,6 @@
}
@Override
- public void setExecutor(Executor executor) {
- this.executor = executor;
- }
-
- @Override
public List<IODeviceHandle> getIODevices() {
return ioDevices;
}
@@ -129,6 +145,39 @@
@Override
public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ IoRequest req = asyncWrite(fHandle, offset, data);
+ InvokeUtil.doUninterruptibly(req);
+ try {
+ if (req.getState() == State.OPERATION_SUCCEEDED) {
+ return req.getWrite();
+ } else if (req.getState() == State.OPERATION_FAILED) {
+ throw req.getFailure();
+ } else {
+ throw new IllegalStateException("Write request completed with state " + req.getState());
+ }
+ } finally {
+ req.recycle();
+ }
+ }
+
+ @Override
+ public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+ IoRequest req = asyncWrite(fHandle, offset, dataArray);
+ InvokeUtil.doUninterruptibly(req);
+ try {
+ if (req.getState() == State.OPERATION_SUCCEEDED) {
+ return req.getWrites();
+ } else if (req.getState() == State.OPERATION_FAILED) {
+ throw req.getFailure();
+ } else {
+ throw new IllegalStateException("Write request completed with state " + req.getState());
+ }
+ } finally {
+ req.recycle();
+ }
+ }
+
+ public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
try {
if (fHandle == null) {
throw new IllegalStateException("Trying to write to a deleted file.");
@@ -152,8 +201,7 @@
}
}
- @Override
- public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+ public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
try {
if (fHandle == null) {
throw new IllegalStateException("Trying to write to a deleted file.");
@@ -197,6 +245,22 @@
*/
@Override
public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ IoRequest req = asyncRead(fHandle, offset, data);
+ InvokeUtil.doUninterruptibly(req);
+ try {
+ if (req.getState() == State.OPERATION_SUCCEEDED) {
+ return req.getRead();
+ } else if (req.getState() == State.OPERATION_FAILED) {
+ throw req.getFailure();
+ } else {
+ throw new IllegalStateException("Reqd request completed with state " + req.getState());
+ }
+ } finally {
+ req.recycle();
+ }
+ }
+
+ public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
try {
int n = 0;
int remaining = data.remaining();
@@ -223,16 +287,38 @@
}
@Override
- public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) {
- AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data);
- executor.execute(req);
+ public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+ IoRequest req = getOrAllocRequest();
+ try {
+ req.write(fHandle, offset, dataArray);
+ } catch (HyracksDataException e) {
+ req.recycle();
+ throw e;
+ }
return req;
}
@Override
- public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) {
- AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data);
- executor.execute(req);
+ public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ IoRequest req = getOrAllocRequest();
+ try {
+ req.write(fHandle, offset, data);
+ } catch (HyracksDataException e) {
+ req.recycle();
+ throw e;
+ }
+ return req;
+ }
+
+ @Override
+ public IoRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ IoRequest req = getOrAllocRequest();
+ try {
+ req.read(fHandle, offset, data);
+ } catch (HyracksDataException e) {
+ req.recycle();
+ throw e;
+ }
return req;
}
@@ -259,80 +345,6 @@
return dev.createFileRef(waPath + File.separator + waf.getName());
}
- private abstract class AsyncRequest implements IIOFuture, Runnable {
- protected final FileHandle fHandle;
- protected final long offset;
- protected final ByteBuffer data;
- private boolean complete;
- private HyracksDataException exception;
- private int result;
-
- private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) {
- this.fHandle = fHandle;
- this.offset = offset;
- this.data = data;
- complete = false;
- exception = null;
- }
-
- @Override
- public void run() {
- HyracksDataException hde = null;
- int res = -1;
- try {
- res = performOperation();
- } catch (HyracksDataException e) {
- hde = e;
- }
- synchronized (this) {
- exception = hde;
- result = res;
- complete = true;
- notifyAll();
- }
- }
-
- protected abstract int performOperation() throws HyracksDataException;
-
- @Override
- public synchronized int synchronize() throws HyracksDataException, InterruptedException {
- while (!complete) {
- wait();
- }
- if (exception != null) {
- throw exception;
- }
- return result;
- }
-
- @Override
- public synchronized boolean isComplete() {
- return complete;
- }
- }
-
- private class AsyncReadRequest extends AsyncRequest {
- private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) {
- super(fHandle, offset, data);
- }
-
- @Override
- protected int performOperation() throws HyracksDataException {
- return syncRead(fHandle, offset, data);
- }
- }
-
- private class AsyncWriteRequest extends AsyncRequest {
- private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) {
- super(fHandle, offset, data);
- }
-
- @Override
- protected int performOperation() throws HyracksDataException {
- return syncWrite(fHandle, offset, data);
- }
- }
-
@Override
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
try {
@@ -390,4 +402,18 @@
}
return null;
}
+
+ @Override
+ public void close() throws IOException {
+ InvokeUtil.doUninterruptibly(() -> submittedRequests.put(IoRequestHandler.POISON_PILL));
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.WARN, "Failure shutting down {} executor service", getClass().getSimpleName());
+ }
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while shutting down {} executor service", getClass().getSimpleName());
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
new file mode 100644
index 0000000..4235d19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IAsyncRequest;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction;
+
+public class IoRequest implements IAsyncRequest, InterruptibleAction {
+
+ public enum State {
+ INITIAL,
+ READ_REQUESTED,
+ WRITE_REQUESTED,
+ OPERATION_FAILED,
+ OPERATION_SUCCEEDED
+ }
+
+ private final IOManager ioManager;
+ private final BlockingQueue<IoRequest> submittedRequests;
+ private final BlockingQueue<IoRequest> freeRequests;
+ private State state;
+ private IFileHandle fHandle;
+ private long offset;
+ private ByteBuffer data;
+ private ByteBuffer[] dataArray;
+ private HyracksDataException failure;
+ private int read;
+ private int write;
+ private long writes;
+
+ public IoRequest(IOManager ioManager, BlockingQueue<IoRequest> submittedRequests,
+ BlockingQueue<IoRequest> freeRequests) {
+ this.ioManager = ioManager;
+ this.submittedRequests = submittedRequests;
+ this.freeRequests = freeRequests;
+ reset();
+ }
+
+ public void reset() {
+ state = State.INITIAL;
+ fHandle = null;
+ data = null;
+ dataArray = null;
+ failure = null;
+ }
+
+ public void read(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ if (state != State.INITIAL) {
+ throw new IllegalStateException("Can't request a read operation through a " + state + " request");
+ }
+ state = State.READ_REQUESTED;
+ this.fHandle = fHandle;
+ this.offset = offset;
+ this.data = data;
+ queue();
+ }
+
+ public void write(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+ if (state != State.INITIAL) {
+ throw new IllegalStateException("Can't request a write operation through a " + state + " request");
+ }
+ state = State.WRITE_REQUESTED;
+ this.fHandle = fHandle;
+ this.offset = offset;
+ this.dataArray = dataArray;
+ queue();
+ }
+
+ public void write(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ if (state != State.INITIAL) {
+ throw new IllegalStateException("Can't request a write operation through a " + state + " request");
+ }
+ state = State.WRITE_REQUESTED;
+ this.fHandle = fHandle;
+ this.offset = offset;
+ this.data = data;
+ queue();
+ }
+
+ private void queue() throws HyracksDataException {
+ try {
+ submittedRequests.put(this);
+ } catch (InterruptedException e) { // NOSONAR: The call below will re-interrupt
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void await() throws InterruptedException {
+ synchronized (this) {
+ while (state != State.OPERATION_FAILED && state != State.OPERATION_SUCCEEDED) {
+ wait();
+ }
+ }
+ }
+
+ synchronized void handle() {
+ try {
+ if (state == State.READ_REQUESTED) {
+ read = ioManager.doSyncRead(fHandle, offset, data);
+ } else if (state == State.WRITE_REQUESTED) {
+ if (data != null) {
+ // single buffer
+ write = ioManager.doSyncWrite(fHandle, offset, data);
+ } else {
+ // multiple buffers
+ writes = ioManager.doSyncWrite(fHandle, offset, dataArray);
+ }
+ } else {
+ throw new IllegalStateException("IO Request with state = " + state);
+ }
+ state = State.OPERATION_SUCCEEDED;
+ } catch (Throwable th) { // NOSONAR: This method must never throw anything
+ state = State.OPERATION_FAILED;
+ failure = HyracksDataException.create(th);
+ }
+ notifyAll();
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ void recycle() {
+ reset();
+ freeRequests.offer(this);
+ }
+
+ public int getRead() {
+ return read;
+ }
+
+ public int getWrite() {
+ return write;
+ }
+
+ public long getWrites() {
+ return writes;
+ }
+
+ @Override
+ public void run() throws InterruptedException {
+ await();
+ }
+
+ public HyracksDataException getFailure() {
+ return failure;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java
new file mode 100644
index 0000000..84fd24b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IoRequestHandler implements Runnable {
+ private static final Logger LOGGER = LogManager.getLogger();
+ public static final IoRequest POISON_PILL = new IoRequest(null, null, null);
+ private final int num;
+ private final BlockingQueue<IoRequest> queue;
+
+ public IoRequestHandler(int num, BlockingQueue<IoRequest> queue) {
+ this.num = num;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(getClass().getSimpleName() + "-" + num);
+ while (true) { // NOSONAR: Suppress 1 continue and 1 break
+ IoRequest next;
+ try {
+ next = queue.take();
+ } catch (InterruptedException e) { // NOSONAR: This is not supposed to be ever interrupted
+ LOGGER.log(Level.WARN, "Ignoring interrupt. IO threads should never be interrupted.");
+ continue;
+ }
+ if (next == POISON_PILL) {
+ LOGGER.log(Level.INFO, "Exiting");
+ InvokeUtil.doUninterruptibly(() -> queue.put(POISON_PILL));
+ if (Thread.interrupted()) {
+ LOGGER.log(Level.ERROR, "Ignoring interrupt. IO threads should never be interrupted.");
+ }
+ break;
+ }
+ next.handle();
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
index b597b60..7861df0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java
@@ -61,32 +61,6 @@
}
/**
- * Close the AutoCloseable and suppress any Throwable thrown by the close call.
- * This method must NEVER throw any Throwable
- *
- * @param closable
- * the resource to close
- * @param root
- * the first exception encountered during release of resources
- * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
- */
- public static Throwable close(AutoCloseable closable, Throwable root) {
- if (closable != null) {
- try {
- closable.close();
- } catch (Throwable th) { // NOSONAR Will be re-thrown
- try {
- LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
- } catch (Throwable loggingFailure) {
- // Do nothing
- }
- root = ExceptionUtils.suppress(root, th);
- }
- }
- return root;
- }
-
- /**
* Close the IIndexDataflowHelper and suppress any Throwable thrown by the close call.
* This method must NEVER throw any Throwable
*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 7f915c6..1443bbc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,14 +28,11 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -47,9 +44,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
-import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.IoUtil;
-import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
import org.apache.logging.log4j.Level;
@@ -60,7 +55,6 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAP_FACTOR = 3;
- private static final CachedPage POISON_PILL = new CachedPage();
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
@@ -72,8 +66,7 @@
private final int pageSize;
private final int maxOpenFiles;
- private final ExecutorService executor;
- private final IIOManager ioManager;
+ final IIOManager ioManager;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -82,7 +75,6 @@
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AsyncFIFOPageQueueManager fifoWriter;
private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
- private final BlockingQueue<CachedPage> readRequests;
//DEBUG
private Level fileOpsLevel = Level.DEBUG;
@@ -111,43 +103,19 @@
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
- int numReaders = ioManager.getIODevices().size() * 2;
- readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
- executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
- try {
- fileInfoMap = new HashMap<>();
- cleanerThread = new CleanerThread();
- executor.execute(cleanerThread);
- for (int i = 0; i < numReaders; i++) {
- executor.execute(new ReaderThread(i));
- }
- closed = false;
- fifoWriter = new AsyncFIFOPageQueueManager(this);
- if (DEBUG) {
- confiscatedPages = new ArrayList<>();
- confiscatedPagesOwner = new HashMap<>();
- confiscateLock = new ReentrantLock();
- pinnedPageOwner = new ConcurrentHashMap<>();
- }
- } catch (Throwable th) {
- try {
- throw th;
- } finally {
- readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty
- executor.shutdown();
- try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
- LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
- }
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
- Thread.currentThread().interrupt();
- th.addSuppressed(e);
- } catch (Throwable e) {
- LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e);
- th.addSuppressed(e);
- }
- }
+
+ Executor executor = Executors.newCachedThreadPool(threadFactory);
+ fileInfoMap = new HashMap<>();
+ cleanerThread = new CleanerThread();
+ executor.execute(cleanerThread);
+ closed = false;
+
+ fifoWriter = new AsyncFIFOPageQueueManager(this);
+ if (DEBUG) {
+ confiscatedPages = new ArrayList<>();
+ confiscatedPagesOwner = new HashMap<>();
+ confiscateLock = new ReentrantLock();
+ pinnedPageOwner = new ConcurrentHashMap<>();
}
}
@@ -201,48 +169,38 @@
pinSanityCheck(dpid);
}
CachedPage cPage = findPage(dpid);
- if (cPage.state != State.VALID) {
- synchronized (cPage) {
- if (!newPage) {
- if (DEBUG) {
- confiscateLock.lock();
- try {
- for (CachedPage c : confiscatedPages) {
- if (c.dpid == dpid && c.confiscated.get()) {
- throw new IllegalStateException();
- }
- }
- } finally {
- confiscateLock.unlock();
+ if (!newPage) {
+ if (DEBUG) {
+ confiscateLock.lock();
+ try {
+ for (CachedPage c : confiscatedPages) {
+ if (c.dpid == dpid && c.confiscated.get()) {
+ throw new IllegalStateException();
}
}
- // Resolve race of multiple threads trying to read the page from
- // disk.
-
- if (cPage.state != State.VALID) {
- try {
- // Will attempt to re-read even if previous read failed
- if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) {
- // submit request to read
- cPage.state = State.READ_REQUESTED;
- readRequests.put(cPage);
- }
- cPage.awaitRead();
- } catch (InterruptedException e) {
- cPage.state = State.INVALID;
- unpin(cPage);
- throw HyracksDataException.create(e);
- } catch (Throwable th) {
- unpin(cPage);
- throw HyracksDataException.create(th);
- }
- }
-
- } else {
- cPage.state = State.VALID;
- cPage.notifyAll();
+ } finally {
+ confiscateLock.unlock();
}
}
+ // Resolve race of multiple threads trying to read the page from
+ // disk.
+ synchronized (cPage) {
+ if (!cPage.valid) {
+ try {
+ tryRead(cPage);
+ cPage.valid = true;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e);
+ throw e;
+ } finally {
+ if (!cPage.valid) {
+ unpin(cPage);
+ }
+ }
+ }
+ }
+ } else {
+ cPage.valid = true;
}
pageReplacementStrategy.notifyCachePageAccess(cPage);
if (DEBUG) {
@@ -491,7 +449,7 @@
buffer.append(" ").append(cp.cpid).append(" -> [")
.append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
.append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
- .append(", ").append(cp.state).append(", ")
+ .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
.append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
.append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
cp = cp.next;
@@ -522,7 +480,7 @@
if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
return false;
}
- if (c.state == State.VALID) {
+ if (c.valid) {
reachableDpids.add(c.dpid);
}
}
@@ -561,9 +519,6 @@
read(cPage);
return;
} catch (HyracksDataException readException) {
- if (Thread.interrupted()) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
- }
if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) {
/**
* if the read failure was due to another thread closing the file channel because
@@ -575,7 +530,8 @@
LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1,
MAX_PAGE_READ_ATTEMPTS), readException);
} catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
} else {
throw readException;
@@ -714,56 +670,6 @@
}
}
- private class ReaderThread implements Runnable {
- private final int num;
-
- private ReaderThread(int num) {
- this.num = num;
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("Buffer-Cache-Reader-" + num);
- while (true) {
- CachedPage next;
- try {
- next = readRequests.take();
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
- break;
- }
- if (next == POISON_PILL) {
- LOGGER.log(Level.INFO, "Exiting");
- InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
- if (Thread.interrupted()) {
- LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted.");
- }
- break;
- }
- synchronized (next) {
- if (next.state != State.VALID) {
- if (next.state != State.READ_REQUESTED) {
- LOGGER.log(Level.ERROR,
- "Exiting BufferCache reader thread. Took a page with state = {} out of the queue",
- next.state);
- break;
- }
- try {
- tryRead(next);
- next.state = State.VALID;
- } catch (HyracksDataException e) {
- next.readFailure = e;
- next.state = State.READ_FAILED;
- LOGGER.log(Level.WARN, "Failed to read a page", e);
- }
- next.notifyAll();
- }
- }
- }
- }
-
- }
-
private class CleanerThread implements Runnable {
private volatile boolean shutdownStart = false;
private volatile boolean shutdownComplete = false;
@@ -893,16 +799,6 @@
});
fileInfoMap.clear();
}
- InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
- executor.shutdown();
- try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
- LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
- }
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
- Thread.currentThread().interrupt();
- }
}
@Override
@@ -1447,7 +1343,7 @@
}
try {
cPage.reset(cPage.dpid);
- cPage.state = State.VALID;
+ cPage.valid = true;
cPage.next = bucket.cachedPage;
bucket.cachedPage = cPage;
cPage.pinCount.decrementAndGet();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index d7a55af..02eb8bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,19 +23,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
/**
* @author yingyib
*/
public class CachedPage implements ICachedPageInternal {
- public enum State {
- INVALID,
- READ_REQUESTED,
- READ_FAILED,
- VALID
- }
-
final int cpid;
ByteBuffer buffer;
public final AtomicInteger pinCount;
@@ -45,7 +36,7 @@
private final IPageReplacementStrategy pageReplacementStrategy;
volatile long dpid; // disk page id (composed of file id and page id)
CachedPage next;
- volatile State state;
+ volatile boolean valid;
final AtomicBoolean confiscated;
private IQueueInfo queueInfo;
private int multiplier;
@@ -53,7 +44,6 @@
// DEBUG
private static final boolean DEBUG = false;
private final StackTraceElement[] ctorStack;
- Throwable readFailure;
//Constructor for making dummy entry for FIFO queue
public CachedPage() {
@@ -82,7 +72,7 @@
latch = new ReentrantReadWriteLock(true);
replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
dpid = -1;
- state = State.INVALID;
+ valid = false;
confiscated = new AtomicBoolean(false);
queueInfo = null;
ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -91,7 +81,7 @@
public void reset(long dpid) {
this.dpid = dpid;
dirty.set(false);
- state = State.INVALID;
+ valid = false;
confiscated.set(false);
pageReplacementStrategy.notifyCachePageReset(this);
queueInfo = null;
@@ -215,30 +205,4 @@
public boolean isLargePage() {
return multiplier > 1;
}
-
- /**
- * Wait for the page requested to be read to complete the read operation
- * This method is uninterrubtible
- *
- * @throws HyracksDataException
- */
- public synchronized void awaitRead() throws HyracksDataException {
- boolean interrupted = false;
- try {
- while (state != State.VALID && state != State.READ_FAILED) {
- try {
- wait();
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- if (state == State.READ_FAILED) {
- throw HyracksDataException.create(readFailure);
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 3060b25..673a27f 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -21,7 +21,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -110,7 +109,7 @@
List<IODeviceHandle> devices = new ArrayList<>();
devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"),
"iodev_test_wa"));
- ioManager = new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+ ioManager = new IOManager(devices, new DefaultDeviceResolver());
}
return ioManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index d7578af..ebfaeb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executors;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -40,6 +39,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.core.Appender;
@@ -49,8 +49,20 @@
public class TestUtils {
public static IHyracksTaskContext create(int frameSize) {
+ IOManager ioManager = null;
try {
- IOManager ioManager = createIoManager();
+ ioManager = createIoManager();
+ return create(frameSize, ioManager);
+ } catch (Exception e) {
+ if (ioManager != null) {
+ CleanupUtils.close(ioManager, e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static IHyracksTaskContext create(int frameSize, IOManager ioManager) {
+ try {
INCServiceContext serviceCtx = new TestNCServiceContext(ioManager, null);
TestJobletContext jobletCtx = new TestJobletContext(frameSize, serviceCtx, new JobId(0));
TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
@@ -64,7 +76,7 @@
private static IOManager createIoManager() throws HyracksException {
List<IODeviceHandle> devices = new ArrayList<>();
devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
- return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+ return new IOManager(devices, new DefaultDeviceResolver());
}
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
index 2243ee3..ea22355 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java
@@ -73,85 +73,66 @@
@Test
public void test01() throws Exception {
-
TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
IBufferCache bufferCache = harness.getBufferCache();
-
// declare fields
int fieldCount = 2;
ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
typeTraits[0] = IntegerPointable.TYPE_TRAITS;
typeTraits[1] = IntegerPointable.TYPE_TRAITS;
-
// declare keys
int keyFieldCount = 1;
IBinaryComparatorFactory[] cmpFactories = new IBinaryComparatorFactory[keyFieldCount];
cmpFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
-
BTreeTypeAwareTupleWriterFactory tupleWriterFactory = new BTreeTypeAwareTupleWriterFactory(typeTraits, false);
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(tupleWriterFactory);
ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory);
ITreeIndexMetadataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
-
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) leafFrameFactory.createFrame();
IBTreeInteriorFrame interiorFrame = (IBTreeInteriorFrame) interiorFrameFactory.createFrame();
ITreeIndexMetadataFrame metaFrame = metaFrameFactory.createFrame();
-
IMetadataPageManager freePageManager = new LinkedMetaDataPageManager(bufferCache, metaFrameFactory);
-
BTree btree = new BTree(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories,
fieldCount, harness.getFileReference());
btree.create();
btree.activate();
-
Random rnd = new Random();
rnd.setSeed(50);
-
long start = System.currentTimeMillis();
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info("INSERTING INTO TREE");
}
-
IFrame frame = new VSizeFrame(ctx);
FrameTupleAppender appender = new FrameTupleAppender();
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
DataOutput dos = tb.getDataOutput();
-
ISerializerDeserializer[] recDescSers =
{ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
IFrameTupleAccessor accessor = new FrameTupleAccessor(recDesc);
accessor.reset(frame.getBuffer());
FrameTupleReference tuple = new FrameTupleReference();
-
IndexAccessParameters actx =
new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE);
ITreeIndexAccessor indexAccessor = btree.createAccessor(actx);
// 10000
for (int i = 0; i < 100000; i++) {
-
int f0 = rnd.nextInt() % 100000;
int f1 = 5;
-
tb.reset();
IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
tb.addFieldEndOffset();
IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
tb.addFieldEndOffset();
-
appender.reset(frame, true);
appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-
tuple.reset(accessor, 0);
-
if (LOGGER.isInfoEnabled()) {
if (i % 10000 == 0) {
long end = System.currentTimeMillis();
LOGGER.info("INSERTING " + i + " : " + f0 + " " + f1 + " " + (end - start));
}
}
-
try {
indexAccessor.insert(tuple);
} catch (HyracksDataException e) {
@@ -161,18 +142,15 @@
}
}
}
-
TreeIndexStatsGatherer statsGatherer = new TreeIndexStatsGatherer(bufferCache, freePageManager,
harness.getFileReference(), btree.getRootPageId());
TreeIndexStats stats = statsGatherer.gatherStats(leafFrame, interiorFrame, metaFrame);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("\n" + stats.toString());
}
-
TreeIndexBufferCacheWarmup bufferCacheWarmup =
new TreeIndexBufferCacheWarmup(bufferCache, freePageManager, harness.getFileReference());
bufferCacheWarmup.warmup(leafFrame, metaFrame, new int[] { 1, 2 }, new int[] { 2, 5 });
-
btree.deactivate();
btree.destroy();
bufferCache.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
index 56618e7..8c1124f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
@@ -31,7 +31,6 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Executors;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -247,7 +246,7 @@
String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i;
devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
}
- return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver());
+ return new IOManager(devices, new DefaultDeviceResolver());
}
private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)