Orchestrated the use of executors and thread factory and added buffer cache core dump.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 6ee719b..54aac5e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.application;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
@@ -39,4 +40,8 @@
public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
+ public ThreadFactory getThreadFactory();
+
+ public void setThreadFactory(ThreadFactory threadFactory);
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
index 2e7feb5..668950b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -47,4 +48,6 @@
public void close(IFileHandle fHandle) throws HyracksDataException;
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+
+ public void setExecutor(Executor executor);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java
deleted file mode 100644
index 948584e..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.lifecycle;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-public class ApplicationThreadExecutor implements Executor {
-
- private final ThreadFactory threadFactory;
- private final Executor executor = Executors.newCachedThreadPool();
-
- public ApplicationThreadExecutor(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- }
-
- @Override
- public void execute(Runnable command) {
- Thread t = threadFactory.newThread(command);
- t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
- executor.execute(t);
- }
-
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
index fb96456..e273863 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -14,11 +14,12 @@
*/
package edu.uci.ics.hyracks.api.lifecycle;
+import java.io.IOException;
import java.io.OutputStream;
public interface ILifeCycleComponent {
public void start();
- public void stop(boolean dumpState, OutputStream ouputStream);
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index 314ac67..8f0580d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -28,17 +28,20 @@
public static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
public static final class Config {
- public static final String KEY_DUMP_PATH = "DUMP_PATH";
+ public static final String DUMP_PATH_KEY = "DUMP_PATH";
}
private static final Logger LOGGER = Logger.getLogger(LifeCycleComponentManager.class.getName());
- private final List<ILifeCycleComponent> components = new ArrayList<ILifeCycleComponent>();
- private boolean stopInitiated = false;
+ private final List<ILifeCycleComponent> components;
+ private boolean stopInitiated;
private String dumpPath;
- private boolean configured = false;
+ private boolean configured;
private LifeCycleComponentManager() {
+ components = new ArrayList<ILifeCycleComponent>();
+ stopInitiated = false;
+ configured = false;
}
@Override
@@ -122,7 +125,7 @@
@Override
public void configure(Map<String, String> configuration) {
- dumpPath = configuration.get(Config.KEY_DUMP_PATH);
+ dumpPath = configuration.get(Config.DUMP_PATH_KEY);
if (dumpPath == null) {
dumpPath = System.getProperty("user.dir");
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -130,7 +133,7 @@
}
}
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.severe("LifecycleComponentManager configurd " + this);
+ LOGGER.severe("LifecycleComponentManager configured " + this);
}
configured = true;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 9b8a996..6548ec7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -119,7 +119,7 @@
private final WorkQueue workQueue;
- private final ExecutorService executor;
+ private ExecutorService executor;
private final Timer timer;
@@ -140,7 +140,6 @@
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
ipAddressNodeNameMap = new HashMap<String, Set<String>>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
- executor = Executors.newCachedThreadPool();
IIPCI ccIPCI = new ClusterControllerIPCI();
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
@@ -223,6 +222,7 @@
.size()]);
aep.start(appCtx, args);
}
+ executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 58ae79e..04ee3bd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -16,6 +16,7 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
@@ -28,6 +29,11 @@
protected Serializable distributedState;
protected IMessageBroker messageBroker;
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
+ protected ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public ApplicationContext(ServerContext serverCtx) throws IOException {
this.serverCtx = serverCtx;
@@ -52,4 +58,14 @@
public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
return this.jobSerDeContainer;
}
+
+ @Override
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ @Override
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a1c3b08..f82aa37 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,9 +99,9 @@
private final NetworkManager netManager;
- private final IDatasetPartitionManager datasetPartitionManager;
+ private IDatasetPartitionManager datasetPartitionManager;
- private final DatasetNetworkManager datasetNetworkManager;
+ private DatasetNetworkManager datasetNetworkManager;
private final WorkQueue queue;
@@ -115,7 +115,7 @@
private final Map<JobId, Joblet> jobletMap;
- private final ExecutorService executor;
+ private ExecutorService executor;
private NodeParameters nodeParameters;
@@ -142,22 +142,17 @@
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
- executor = Executors.newCachedThreadPool();
NodeControllerIPCI ipci = new NodeControllerIPCI();
ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
new CCNCFunctions.SerializerDeserializer());
- this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+
+ this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
if (id == null) {
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultHistorySize);
- datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads);
-
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
@@ -219,6 +214,14 @@
fv.setValue(ncInfos);
}
+ private void init() throws Exception {
+ ctx.getIOManager().setExecutor(executor);
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+ ncConfig.resultHistorySize);
+ datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+ datasetPartitionManager, ncConfig.nNetThreads);
+ }
+
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -226,6 +229,7 @@
netManager.start();
startApplication();
+ init();
datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
@@ -280,6 +284,7 @@
.toArray(new String[ncConfig.appArgs.size()]);
ncAppEntryPoint.start(appCtx, args);
}
+ executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index 3b13f32..c94953b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -33,15 +33,19 @@
public class IOManager implements IIOManager {
private final List<IODeviceHandle> ioDevices;
- private final Executor executor;
+ private Executor executor;
private final List<IODeviceHandle> workAreaIODevices;
private int workAreaDeviceIndex;
public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
- this.ioDevices = Collections.unmodifiableList(devices);
+ this(devices);
this.executor = executor;
+ }
+
+ public IOManager(List<IODeviceHandle> devices) throws HyracksException {
+ this.ioDevices = Collections.unmodifiableList(devices);
workAreaIODevices = new ArrayList<IODeviceHandle>();
for (IODeviceHandle d : ioDevices) {
if (d.getWorkAreaPath() != null) {
@@ -55,6 +59,10 @@
workAreaDeviceIndex = 0;
}
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
@Override
public List<IODeviceHandle> getIODevices() {
return ioDevices;
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index aa5ce1d..336c3fc 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.examples.btree.helper;
+import java.util.concurrent.ThreadFactory;
+
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,13 +44,18 @@
private ILocalResourceRepository localResourceRepository;
private IIndexLifecycleManager lcManager;
private ResourceIdFactory resourceIdFactory;
+ private ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
- new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
+ new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
lcManager = new IndexLifecycleManager();
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
index 9badc58..c4f43b9 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
@@ -3,9 +3,8 @@
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-public interface IIndexLifecycleManager extends ILifeCycleComponent {
+public interface IIndexLifecycleManager {
public IIndex getIndex(long resourceID);
public void register(long resourceID, IIndex index) throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 63557f7..13ae7b1 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -1,5 +1,7 @@
package edu.uci.ics.hyracks.storage.am.common.dataflow;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
@@ -8,199 +10,214 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-public class IndexLifecycleManager implements IIndexLifecycleManager {
- private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100
- // megabytes
+public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+ private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
- private final Map<Long, IndexInfo> indexInfos;
- private final long memoryBudget;
+ private final Map<Long, IndexInfo> indexInfos;
+ private final long memoryBudget;
- private long memoryUsed;
+ private long memoryUsed;
- public IndexLifecycleManager() {
- this(DEFAULT_MEMORY_BUDGET);
- }
+ public IndexLifecycleManager() {
+ this(DEFAULT_MEMORY_BUDGET);
+ }
- public IndexLifecycleManager(long memoryBudget) {
- this.indexInfos = new HashMap<Long, IndexInfo>();
- this.memoryBudget = memoryBudget;
- this.memoryUsed = 0;
- LifeCycleComponentManager.INSTANCE.register(this);
- }
+ public IndexLifecycleManager(long memoryBudget) {
+ this.indexInfos = new HashMap<Long, IndexInfo>();
+ this.memoryBudget = memoryBudget;
+ this.memoryUsed = 0;
+ }
- private boolean evictCandidateIndex() throws HyracksDataException {
- // Why min()? As a heuristic for eviction, we will take an open index
- // (an index consuming memory)
- // that is not being used (refcount == 0) and has been least recently
- // used. The sort order defined
- // for IndexInfo maintains this. See IndexInfo.compareTo().
- IndexInfo info = Collections.min(indexInfos.values());
- if (info.referenceCount != 0 || !info.isOpen) {
- return false;
- }
+ private boolean evictCandidateIndex() throws HyracksDataException {
+ // Why min()? As a heuristic for eviction, we will take an open index
+ // (an index consuming memory)
+ // that is not being used (refcount == 0) and has been least recently
+ // used. The sort order defined
+ // for IndexInfo maintains this. See IndexInfo.compareTo().
+ IndexInfo info = Collections.min(indexInfos.values());
+ if (info.referenceCount != 0 || !info.isOpen) {
+ return false;
+ }
- info.index.deactivate();
- memoryUsed -= info.index.getMemoryAllocationSize();
- info.isOpen = false;
+ info.index.deactivate();
+ memoryUsed -= info.index.getMemoryAllocationSize();
+ info.isOpen = false;
- return true;
- }
+ return true;
+ }
- @Override
- public IIndex getIndex(long resourceID) {
- IndexInfo info = indexInfos.get(resourceID);
- return info == null ? null : info.index;
- }
+ @Override
+ public IIndex getIndex(long resourceID) {
+ IndexInfo info = indexInfos.get(resourceID);
+ return info == null ? null : info.index;
+ }
- @Override
- public void register(long resourceID, IIndex index)
- throws HyracksDataException {
- if (indexInfos.containsKey(resourceID)) {
- throw new HyracksDataException("Index with resource ID "
- + resourceID + " already exists.");
- }
+ @Override
+ public void register(long resourceID, IIndex index) throws HyracksDataException {
+ if (indexInfos.containsKey(resourceID)) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+ }
- indexInfos.put(resourceID, new IndexInfo(index));
- }
+ indexInfos.put(resourceID, new IndexInfo(index));
+ }
- @Override
- public void unregister(long resourceID) throws HyracksDataException {
- IndexInfo info = indexInfos.remove(resourceID);
- if (info == null) {
- throw new HyracksDataException("Index with resource ID "
- + resourceID + " does not exist.");
- }
+ @Override
+ public void unregister(long resourceID) throws HyracksDataException {
+ IndexInfo info = indexInfos.remove(resourceID);
+ if (info == null) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+ }
- if (info.referenceCount != 0) {
- indexInfos.put(resourceID, info);
- throw new HyracksDataException(
- "Cannot remove index while it is open.");
- }
+ if (info.referenceCount != 0) {
+ indexInfos.put(resourceID, info);
+ throw new HyracksDataException("Cannot remove index while it is open.");
+ }
- if (info.isOpen) {
- info.index.deactivate();
- memoryUsed -= info.index.getMemoryAllocationSize();
- }
- }
+ if (info.isOpen) {
+ info.index.deactivate();
+ memoryUsed -= info.index.getMemoryAllocationSize();
+ }
+ }
- @Override
- public void open(long resourceID) throws HyracksDataException {
- IndexInfo info = indexInfos.get(resourceID);
- if (info == null) {
- throw new HyracksDataException(
- "Failed to open index with resource ID " + resourceID
- + " since it does not exist.");
- }
+ @Override
+ public void open(long resourceID) throws HyracksDataException {
+ IndexInfo info = indexInfos.get(resourceID);
+ if (info == null) {
+ throw new HyracksDataException("Failed to open index with resource ID " + resourceID
+ + " since it does not exist.");
+ }
- long inMemorySize = info.index.getMemoryAllocationSize();
- while (memoryUsed + inMemorySize > memoryBudget) {
- if (!evictCandidateIndex()) {
- throw new HyracksDataException(
- "Cannot activate index since memory budget would be exceeded.");
- }
- }
+ long inMemorySize = info.index.getMemoryAllocationSize();
+ while (memoryUsed + inMemorySize > memoryBudget) {
+ if (!evictCandidateIndex()) {
+ throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
+ }
+ }
- if (!info.isOpen) {
- info.index.activate();
- info.isOpen = true;
- memoryUsed += inMemorySize;
- }
- info.touch();
- }
+ if (!info.isOpen) {
+ info.index.activate();
+ info.isOpen = true;
+ memoryUsed += inMemorySize;
+ }
+ info.touch();
+ }
- @Override
- public void close(long resourceID) {
- indexInfos.get(resourceID).untouch();
- }
+ @Override
+ public void close(long resourceID) {
+ indexInfos.get(resourceID).untouch();
+ }
- private class IndexInfo implements Comparable<IndexInfo> {
- private final IIndex index;
- private int referenceCount;
- private long lastAccess;
- private boolean isOpen;
+ private class IndexInfo implements Comparable<IndexInfo> {
+ private final IIndex index;
+ private int referenceCount;
+ private long lastAccess;
+ private boolean isOpen;
- public IndexInfo(IIndex index) {
- this.index = index;
- this.lastAccess = -1;
- this.referenceCount = 0;
- this.isOpen = false;
- }
+ public IndexInfo(IIndex index) {
+ this.index = index;
+ this.lastAccess = -1;
+ this.referenceCount = 0;
+ this.isOpen = false;
+ }
- public void touch() {
- lastAccess = System.currentTimeMillis();
- referenceCount++;
- }
+ public void touch() {
+ lastAccess = System.currentTimeMillis();
+ referenceCount++;
+ }
- public void untouch() {
- lastAccess = System.currentTimeMillis();
- referenceCount--;
- }
+ public void untouch() {
+ lastAccess = System.currentTimeMillis();
+ referenceCount--;
+ }
- @Override
- public int compareTo(IndexInfo i) {
- // sort by (isOpen, referenceCount, lastAccess) ascending, where
- // true < false
- //
- // Example sort order:
- // -------------------
- // (F, 0, 70) <-- largest
- // (F, 0, 60)
- // (T, 10, 80)
- // (T, 10, 70)
- // (T, 9, 90)
- // (T, 0, 100) <-- smallest
- if (isOpen && !i.isOpen) {
- return -1;
- } else if (!isOpen && i.isOpen) {
- return 1;
- } else {
- if (referenceCount < i.referenceCount) {
- return -1;
- } else if (referenceCount > i.referenceCount) {
- return 1;
- } else {
- if (lastAccess < i.lastAccess) {
- return -1;
- } else if (lastAccess > i.lastAccess) {
- return 1;
- } else {
- return 0;
- }
- }
- }
+ @Override
+ public int compareTo(IndexInfo i) {
+ // sort by (isOpen, referenceCount, lastAccess) ascending, where
+ // true < false
+ //
+ // Example sort order:
+ // -------------------
+ // (F, 0, 70) <-- largest
+ // (F, 0, 60)
+ // (T, 10, 80)
+ // (T, 10, 70)
+ // (T, 9, 90)
+ // (T, 0, 100) <-- smallest
+ if (isOpen && !i.isOpen) {
+ return -1;
+ } else if (!isOpen && i.isOpen) {
+ return 1;
+ } else {
+ if (referenceCount < i.referenceCount) {
+ return -1;
+ } else if (referenceCount > i.referenceCount) {
+ return 1;
+ } else {
+ if (lastAccess < i.lastAccess) {
+ return -1;
+ } else if (lastAccess > i.lastAccess) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
- }
+ }
- public String toString() {
- return "{index: " + index + ", isOpen: " + isOpen + ", refCount: "
- + referenceCount + ", lastAccess: " + lastAccess + "}";
- }
- }
+ public String toString() {
+ return "{index: " + index + ", isOpen: " + isOpen + ", refCount: " + referenceCount + ", lastAccess: "
+ + lastAccess + "}";
+ }
+ }
- @Override
- public List<IIndex> getOpenIndexes() {
- List<IIndex> openIndexes = new ArrayList<IIndex>();
- for (IndexInfo i : indexInfos.values()) {
- if (i.isOpen) {
- openIndexes.add(i.index);
- }
- }
- return openIndexes;
- }
+ @Override
+ public List<IIndex> getOpenIndexes() {
+ List<IIndex> openIndexes = new ArrayList<IIndex>();
+ for (IndexInfo i : indexInfos.values()) {
+ if (i.isOpen) {
+ openIndexes.add(i.index);
+ }
+ }
+ return openIndexes;
+ }
- @Override
- public void start() {
- // TODO Auto-generated method stub
+ @Override
+ public void start() {
+ }
- }
+ @Override
+ public void stop(boolean dumpState, OutputStream outputStream) throws IOException {
+ if (dumpState) {
+ dumpState(outputStream);
+ }
- @Override
- public void stop(boolean dumpState, OutputStream ouputStream) {
- // TODO Auto-generated method stub
+ for (IndexInfo i : indexInfos.values()) {
+ if (i.isOpen) {
+ i.index.deactivate();
+ }
+ }
+ }
- }
+ private void dumpState(OutputStream os) throws IOException {
+ DataOutputStream dos = new DataOutputStream(os);
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(String.format("Memory budget = %d\n", memoryBudget));
+ sb.append(String.format("Memory used = %d\n", memoryUsed));
+
+ String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";
+ String rowFormat = "%-20d %-10b %-20d %-20s %-20s\n";
+ sb.append(String.format(headerFormat, "ResourceID", "Open", "Reference Count", "Last Access", "Index Name"));
+ IndexInfo ii;
+ for (Map.Entry<Long, IndexInfo> entry : indexInfos.entrySet()) {
+ ii = entry.getValue();
+ sb.append(String.format(rowFormat, entry.getKey(), ii.isOpen, ii.referenceCount, ii.lastAccess, ii.index));
+ }
+ dos.writeUTF(sb.toString());
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
index 99fdb2e..66d8ec2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -204,16 +203,4 @@
public IFileMapProvider getFileMapProvider() {
return fileMapManager;
}
-
- @Override
- public void start() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void stop(boolean dumpState, OutputStream ouputStream) {
- // TODO Auto-generated method stub
-
- }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f8195ab..934575c 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,10 +14,15 @@
*/
package edu.uci.ics.hyracks.storage.common.buffercache;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -31,10 +36,11 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
-public class BufferCache implements IBufferCacheInternal {
+public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
private static final int MAP_FACTOR = 2;
@@ -46,7 +52,6 @@
private final IIOManager ioManager;
private final int pageSize;
private final int numPages;
- private final CachedPage[] cachedPages;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -54,31 +59,34 @@
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
+ private CachedPage[] cachedPages;
+
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
- IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+ IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
this.maxOpenFiles = maxOpenFiles;
pageReplacementStrategy.setBufferCache(this);
+ pageMap = new CacheBucket[numPages * MAP_FACTOR];
+ for (int i = 0; i < pageMap.length; ++i) {
+ pageMap[i] = new CacheBucket();
+ }
ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
cachedPages = new CachedPage[buffers.length];
for (int i = 0; i < buffers.length; ++i) {
cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
}
- pageMap = new CacheBucket[numPages * MAP_FACTOR];
- for (int i = 0; i < pageMap.length; ++i) {
- pageMap[i] = new CacheBucket();
- }
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
+ Executor executor = Executors.newCachedThreadPool(threadFactory);
fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
cleanerThread = new CleanerThread();
- cleanerThread.start();
+ executor.execute(cleanerThread);
closed = false;
}
@@ -791,13 +799,14 @@
@Override
public void start() {
- // TODO Auto-generated method stub
-
+ // no op
}
@Override
- public void stop(boolean dumpState, OutputStream ouputStream) {
- // TODO Auto-generated method stub
-
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ if (dumpState) {
+ new DataOutputStream(ouputStream).writeUTF(dumpState());
+ }
+ close();
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 0a43b1f..df8cc26 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.common.buffercache;
-import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -157,25 +156,13 @@
return closeFileCount.get();
}
- @Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
- bufferCache.flushDirtyPage(page);
- }
-
- @Override
- public void force(int fileId, boolean metadata) throws HyracksDataException {
- bufferCache.force(fileId, metadata);
- }
-
@Override
- public void start() {
- // TODO Auto-generated method stub
-
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ bufferCache.flushDirtyPage(page);
}
@Override
- public void stop(boolean dumpState, OutputStream ouputStream) {
- // TODO Auto-generated method stub
-
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ bufferCache.force(fileId, metadata);
}
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index 3c9882f..8b86a91 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -16,9 +16,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-public interface IBufferCache extends ILifeCycleComponent {
+public interface IBufferCache {
public void createFile(FileReference fileRef) throws HyracksDataException;
public void openFile(int fileId) throws HyracksDataException;
@@ -26,7 +25,7 @@
public void closeFile(int fileId) throws HyracksDataException;
public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
-
+
public ICachedPage tryPin(long dpid) throws HyracksDataException;
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
@@ -34,9 +33,9 @@
public void unpin(ICachedPage page) throws HyracksDataException;
public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
-
+
public void force(int fileId, boolean metadata) throws HyracksDataException;
-
+
public int getPageSize();
public int getNumPages();
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 285ab1f..842c0d1 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.test.support;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -70,9 +71,21 @@
return null;
}
- @Override
- public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ThreadFactory getThreadFactory() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index fdb2100..2c6aee5 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -52,6 +53,11 @@
private static int pageSize;
private static int numPages;
private static int maxOpenFiles;
+ private final static ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public static void init(int pageSize, int numPages, int maxOpenFiles) {
TestStorageManagerComponentHolder.pageSize = pageSize;
@@ -76,7 +82,7 @@
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
- (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
+ (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
}
return bufferCache;
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 8d6ab38..92f2c11 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
@@ -57,6 +58,11 @@
private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
private final IOManager ioManager;
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
@@ -68,7 +74,8 @@
int numPages = (int) (bufferSize / pageSize);
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
- new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
+ new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
+ threadFactory);
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
lcManager = new IndexLifecycleManager();
localResourceRepository = new TransientLocalResourceRepository();