[NO ISSUE] Make IOManager more configurable
Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3133
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Contrib: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e663b49..8b8f5a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -89,6 +89,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -185,6 +186,7 @@
public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
+ int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
@@ -239,11 +241,11 @@
replicationChannel = new ReplicationChannel(this);
bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(),
- storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(),
+ storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory(),
replicationManager);
} else {
bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(),
- storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory());
+ storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory());
}
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 3619cbb..dd92798 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -88,7 +88,9 @@
TRACE_CATEGORIES(STRING_ARRAY, new String[0]),
KEY_STORE_PATH(STRING, (String) null),
TRUST_STORE_PATH(STRING, (String) null),
- KEY_STORE_PASSWORD(STRING, (String) null);
+ KEY_STORE_PASSWORD(STRING, (String) null),
+ IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2),
+ IO_QUEUE_SIZE(POSITIVE_INTEGER, 10);
private final IOptionType parser;
private final String defaultValueDescription;
@@ -217,8 +219,12 @@
return "A fully-qualified path to a trust store file that will be used for secured connections";
case KEY_STORE_PASSWORD:
return "The password to the provided key store";
+ case IO_WORKERS_PER_PARTITION:
+ return "Number of threads per partition used to write and read from storage";
+ case IO_QUEUE_SIZE:
+ return "Length of the queue used for requests to write and read";
default:
- throw new IllegalStateException("NYI: " + this);
+ throw new IllegalStateException("Not yet implemented: " + this);
}
}
@@ -575,4 +581,12 @@
public void setTrustStorePath(String keyStorePath) {
configManager.set(nodeId, Option.TRUST_STORE_PATH, keyStorePath);
}
+
+ public int getIOParallelism() {
+ return appConfig.getInt(Option.IO_WORKERS_PER_PARTITION);
+ }
+
+ public int getIOQueueSize() {
+ return appConfig.getInt(Option.IO_QUEUE_SIZE);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 317d59a..517169b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -214,8 +214,8 @@
ncShutdownHook = new NCShutdownHook(this);
Runtime.getRuntime().addShutdownHook(ncShutdownHook);
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
- ioManager =
- new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
+ ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+ application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize());
try {
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new ConcurrentHashMap<>();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b5cb21a..14404d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -56,7 +56,6 @@
/*
* Constants
*/
- public static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable
private static final Logger LOGGER = LogManager.getLogger();
private static final String WORKSPACE_FILE_SUFFIX = ".waf";
private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX);
@@ -74,7 +73,8 @@
private int workspaceIndex;
private final IFileDeviceResolver deviceComputer;
- public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
+ public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
+ throws HyracksDataException {
this.ioDevices = Collections.unmodifiableList(devices);
checkDeviceValidity(devices);
workspaces = new ArrayList<>();
@@ -93,9 +93,9 @@
}
workspaceIndex = 0;
this.deviceComputer = deviceComputer;
- submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
- freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
- int numIoThreads = ioDevices.size() * 2;
+ submittedRequests = new ArrayBlockingQueue<>(queueSize);
+ freeRequests = new ArrayBlockingQueue<>(queueSize);
+ int numIoThreads = ioDevices.size() * ioParallelism;
executor = Executors.newFixedThreadPool(numIoThreads);
for (int i = 0; i < numIoThreads; i++) {
executor.execute(new IoRequestHandler(i, submittedRequests));
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index d107ff2..7ba81e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -60,7 +60,7 @@
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
bufferCache = new BufferCache(appCtx.getIoManager(), prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100,
- threadFactory);
+ 10, threadFactory);
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 423925b..31037ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -50,11 +50,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-util</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index b40f252..c4c5d8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.storage.common.buffercache;
-import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE;
-
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -76,8 +74,7 @@
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AsyncFIFOPageQueueManager fifoWriter;
- private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache =
- new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+ private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
private IIOReplicationManager ioReplicationManager;
private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
@@ -93,8 +90,9 @@
private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
- IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+ IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueuelen,
ThreadFactory threadFactory) {
+ this.headerPageCache = new ArrayBlockingQueue<>(ioQueuelen);
this.ioManager = ioManager;
this.pageSize = pageReplacementStrategy.getPageSize();
this.maxOpenFiles = maxOpenFiles;
@@ -124,10 +122,11 @@
//this constructor is used when replication is enabled to pass the IIOReplicationManager
public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
- IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+ IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueueLen,
ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) {
- this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory);
+ this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, ioQueueLen,
+ threadFactory);
this.ioReplicationManager = ioReplicationManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7b7e850..a2da285 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -107,7 +107,7 @@
List<IODeviceHandle> devices = new ArrayList<>();
devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"),
"iodev_test_wa"));
- ioManager = new IOManager(devices, new DefaultDeviceResolver());
+ ioManager = new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
}
return ioManager;
}
@@ -152,7 +152,7 @@
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
IFileMapProvider fileMapProvider = getFileMapProvider();
bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000),
- (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory);
+ (IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory);
return bufferCache;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index ebfaeb8..e36b655 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -76,7 +76,7 @@
private static IOManager createIoManager() throws HyracksException {
List<IODeviceHandle> devices = new ArrayList<>();
devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
- return new IOManager(devices, new DefaultDeviceResolver());
+ return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
}
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
index 22456e8..5d10603 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
@@ -256,7 +256,7 @@
String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i;
devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
}
- return new IOManager(devices, new DefaultDeviceResolver());
+ return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
}
private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
index e2a875b..b0ecc1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
@@ -38,8 +38,8 @@
public void testPrefixNames() throws HyracksDataException {
IODeviceHandle shorter = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
IODeviceHandle longer = new IODeviceHandle(new File("/tmp/tst/11"), "storage");
- IOManager ioManager =
- new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), new DefaultDeviceResolver());
+ IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }),
+ new DefaultDeviceResolver(), 2, 10);
FileReference f = ioManager.resolveAbsolutePath("/tmp/tst/11/storage/Foo_idx_foo/my_btree");
Assert.assertEquals("/tmp/tst/11/storage/Foo_idx_foo/my_btree", f.getAbsolutePath());
}
@@ -48,8 +48,8 @@
public void testDuplicates() throws HyracksDataException {
IODeviceHandle first = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
IODeviceHandle second = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
- IOManager ioManager =
- new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), new DefaultDeviceResolver());
+ IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }),
+ new DefaultDeviceResolver(), 2, 19);
}
@After