Merge branch 'master' into zheilbron/hyracks_asterix_issue470
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 6ee719b..54aac5e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.application;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
@@ -39,4 +40,8 @@
public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
+ public ThreadFactory getThreadFactory();
+
+ public void setThreadFactory(ThreadFactory threadFactory);
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
index 2e7feb5..668950b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -47,4 +48,6 @@
public void close(IFileHandle fHandle) throws HyracksDataException;
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+
+ public void setExecutor(Executor executor);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
new file mode 100644
index 0000000..2cb757f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface ILifeCycleComponent {
+
+ public void start();
+
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
new file mode 100644
index 0000000..6247c17
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Map;
+
+public interface ILifeCycleComponentManager extends UncaughtExceptionHandler {
+
+ public void register(ILifeCycleComponent component);
+
+ public void startAll();
+
+ public void stopAll(boolean dumpState) throws IOException;
+
+ public void configure(Map<String, String> configuration);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
new file mode 100644
index 0000000..ec27653
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class LifeCycleComponentManager implements ILifeCycleComponentManager {
+
+ public final static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
+
+ public static final class Config {
+ public static final String DUMP_PATH_KEY = "DUMP_PATH";
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(LifeCycleComponentManager.class.getName());
+
+ private final List<ILifeCycleComponent> components;
+ private boolean stopInitiated;
+ private String dumpPath;
+ private boolean configured;
+
+ private LifeCycleComponentManager() {
+ components = new ArrayList<ILifeCycleComponent>();
+ stopInitiated = false;
+ configured = false;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Uncaught Exception from thread " + t.getName() + " message: " + e.getMessage());
+ e.printStackTrace();
+ }
+ try {
+ stopAll(true);
+ } catch (IOException e1) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in stopping Asterix. " + e1.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void register(ILifeCycleComponent component) {
+ components.add(component);
+ }
+
+ @Override
+ public void startAll() {
+ for (ILifeCycleComponent component : components) {
+ component.start();
+ }
+ }
+
+ @Override
+ public synchronized void stopAll(boolean dumpState) throws IOException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.severe("Attempting to stop " + this);
+ }
+ if (stopInitiated) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.severe("Stop already in progress");
+ }
+ return;
+ }
+ if (!configured) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Lifecycle management not configured" + this);
+ }
+ return;
+ }
+
+ stopInitiated = true;
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Stopping Asterix instance");
+ }
+
+ FileOutputStream componentDumpStream = null;
+ String componentDumpPath = null;
+ for (int index = components.size() - 1; index >= 0; index--) {
+ ILifeCycleComponent component = components.get(index);
+ try {
+ if (dumpState) {
+ componentDumpPath = dumpPath + File.separator + component.getClass().getName() + "-coredump";
+ File f = new File(componentDumpPath);
+ File parentDir = new File(f.getParent());
+ if (!parentDir.exists()) {
+ parentDir.mkdirs();
+ }
+ componentDumpStream = new FileOutputStream(f);
+ }
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Stopping component instance" + component.getClass().getName() + " dump state "
+ + dumpState + " dump path " + componentDumpPath);
+ }
+ component.stop(dumpState, componentDumpStream);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in stopping component " + component.getClass().getName() + e.getMessage());
+ }
+ } finally {
+ if (componentDumpStream != null) {
+ componentDumpStream.close();
+ }
+ }
+ }
+ stopInitiated = false;
+
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ dumpPath = configuration.get(Config.DUMP_PATH_KEY);
+ if (dumpPath == null) {
+ dumpPath = System.getProperty("user.dir");
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("dump path not configured. Using current directory " + dumpPath);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.severe("LifecycleComponentManager configured " + this);
+ }
+ configured = true;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5294428..1e78f09 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -122,7 +122,7 @@
private final WorkQueue workQueue;
- private final ExecutorService executor;
+ private ExecutorService executor;
private final Timer timer;
@@ -143,7 +143,6 @@
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
ipAddressNodeNameMap = new HashMap<String, Set<String>>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
- executor = Executors.newCachedThreadPool();
IIPCI ccIPCI = new ClusterControllerIPCI();
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
@@ -235,6 +234,7 @@
.size()]);
aep.start(appCtx, args);
}
+ executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 58ae79e..04ee3bd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -16,6 +16,7 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
@@ -28,6 +29,11 @@
protected Serializable distributedState;
protected IMessageBroker messageBroker;
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
+ protected ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public ApplicationContext(ServerContext serverCtx) throws IOException {
this.serverCtx = serverCtx;
@@ -52,4 +58,14 @@
public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
return this.jobSerDeContainer;
}
+
+ @Override
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ @Override
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
index d31d09e..489f696 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -14,11 +14,17 @@
*/
package edu.uci.ics.hyracks.control.nc;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.kohsuke.args4j.CmdLineParser;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
public class NCDriver {
+ private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
+
public static void main(String args[]) throws Exception {
NCConfig ncConfig = new NCConfig();
CmdLineParser cp = new CmdLineParser(ncConfig);
@@ -31,6 +37,10 @@
}
final NodeControllerService nService = new NodeControllerService(ncConfig);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.severe("Setting uncaught exception handler " + LifeCycleComponentManager.INSTANCE);
+ }
+ Thread.currentThread().setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
nService.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a1c3b08..f82aa37 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,9 +99,9 @@
private final NetworkManager netManager;
- private final IDatasetPartitionManager datasetPartitionManager;
+ private IDatasetPartitionManager datasetPartitionManager;
- private final DatasetNetworkManager datasetNetworkManager;
+ private DatasetNetworkManager datasetNetworkManager;
private final WorkQueue queue;
@@ -115,7 +115,7 @@
private final Map<JobId, Joblet> jobletMap;
- private final ExecutorService executor;
+ private ExecutorService executor;
private NodeParameters nodeParameters;
@@ -142,22 +142,17 @@
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
- executor = Executors.newCachedThreadPool();
NodeControllerIPCI ipci = new NodeControllerIPCI();
ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
new CCNCFunctions.SerializerDeserializer());
- this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+
+ this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
if (id == null) {
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultHistorySize);
- datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads);
-
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
@@ -219,6 +214,14 @@
fv.setValue(ncInfos);
}
+ private void init() throws Exception {
+ ctx.getIOManager().setExecutor(executor);
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+ ncConfig.resultHistorySize);
+ datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+ datasetPartitionManager, ncConfig.nNetThreads);
+ }
+
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -226,6 +229,7 @@
netManager.start();
startApplication();
+ init();
datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
@@ -280,6 +284,7 @@
.toArray(new String[ncConfig.appArgs.size()]);
ncAppEntryPoint.start(appCtx, args);
}
+ executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2ec7acc..6d2c043 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -61,22 +62,10 @@
partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
private static final long serialVersionUID = 1L;
- protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+ protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
synchronized (DatasetPartitionManager.this) {
if (size() > resultHistorySize) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
- for (ResultSetId rsId : rsIdMap.keySet()) {
- ResultState[] resultStates = rsIdMap.get(rsId);
- if (resultStates != null) {
- for (int i = 0; i < resultStates.length; i++) {
- ResultState state = resultStates[i];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
- }
- }
- }
- }
+ deinitState(eldest);
return true;
}
return false;
@@ -193,7 +182,28 @@
}
@Override
- public void close() {
+ public synchronized void close() {
+ for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+ deinitState(entry);
+ }
deallocatableRegistry.close();
}
+
+ public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
+ Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+ if (rsIdMap != null) {
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index 3b13f32..c94953b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -33,15 +33,19 @@
public class IOManager implements IIOManager {
private final List<IODeviceHandle> ioDevices;
- private final Executor executor;
+ private Executor executor;
private final List<IODeviceHandle> workAreaIODevices;
private int workAreaDeviceIndex;
public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
- this.ioDevices = Collections.unmodifiableList(devices);
+ this(devices);
this.executor = executor;
+ }
+
+ public IOManager(List<IODeviceHandle> devices) throws HyracksException {
+ this.ioDevices = Collections.unmodifiableList(devices);
workAreaIODevices = new ArrayList<IODeviceHandle>();
for (IODeviceHandle d : ioDevices) {
if (d.getWorkAreaPath() != null) {
@@ -55,6 +59,10 @@
workAreaDeviceIndex = 0;
}
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
@Override
public List<IODeviceHandle> getIODevices() {
return ioDevices;
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index df140e3..bfcafe3 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.examples.btree.helper;
+import java.util.concurrent.ThreadFactory;
+
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,13 +44,18 @@
private ILocalResourceRepository localResourceRepository;
private IIndexLifecycleManager lcManager;
private ResourceIdFactory resourceIdFactory;
+ private ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
- new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
+ new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
lcManager = new IndexLifecycleManager();
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
index 4fa0164..120ee77 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
@@ -27,7 +27,7 @@
}
}
- public JobConf getConf() throws HyracksDataException {
+ public synchronized JobConf getConf() throws HyracksDataException {
try {
JobConf conf = new JobConf();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 197aecc..1c8bd28 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -1,5 +1,7 @@
package edu.uci.ics.hyracks.storage.am.common.dataflow;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -7,10 +9,11 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-public class IndexLifecycleManager implements IIndexLifecycleManager {
+public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
private final Map<Long, IndexInfo> indexInfos;
@@ -29,8 +32,10 @@
}
private boolean evictCandidateIndex() throws HyracksDataException {
- // Why min()? As a heuristic for eviction, we will take an open index (an index consuming memory)
- // that is not being used (refcount == 0) and has been least recently used. The sort order defined
+ // Why min()? As a heuristic for eviction, we will take an open index
+ // (an index consuming memory)
+ // that is not being used (refcount == 0) and has been least recently
+ // used. The sort order defined
// for IndexInfo maintains this. See IndexInfo.compareTo().
IndexInfo info = Collections.min(indexInfos.values());
if (info.referenceCount != 0 || !info.isOpen) {
@@ -130,16 +135,17 @@
@Override
public int compareTo(IndexInfo i) {
- // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+ // sort by (isOpen, referenceCount, lastAccess) ascending, where
+ // true < false
//
// Example sort order:
// -------------------
- // (F, 0, 70) <-- largest
+ // (F, 0, 70) <-- largest
// (F, 0, 60)
// (T, 10, 80)
// (T, 10, 70)
// (T, 9, 90)
- // (T, 0, 100) <-- smallest
+ // (T, 0, 100) <-- smallest
if (isOpen && !i.isOpen) {
return -1;
} else if (!isOpen && i.isOpen) {
@@ -178,4 +184,38 @@
}
return openIndexes;
}
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream outputStream) throws IOException {
+ if (dumpState) {
+ dumpState(outputStream);
+ }
+
+ for (IndexInfo i : indexInfos.values()) {
+ if (i.isOpen) {
+ i.index.deactivate();
+ }
+ }
+ }
+
+ private void dumpState(OutputStream os) throws IOException {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(String.format("Memory budget = %d\n", memoryBudget));
+ sb.append(String.format("Memory used = %d\n", memoryUsed));
+
+ String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";
+ String rowFormat = "%-20d %-10b %-20d %-20s %-20s\n";
+ sb.append(String.format(headerFormat, "ResourceID", "Open", "Reference Count", "Last Access", "Index Name"));
+ IndexInfo ii;
+ for (Map.Entry<Long, IndexInfo> entry : indexInfos.entrySet()) {
+ ii = entry.getValue();
+ sb.append(String.format(rowFormat, entry.getKey(), ii.isOpen, ii.referenceCount, ii.lastAccess, ii.index));
+ }
+ os.write(sb.toString().getBytes());
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 03e46f5..6b6c6a1 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,9 +14,14 @@
*/
package edu.uci.ics.hyracks.storage.common.buffercache;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -30,10 +35,11 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
-public class BufferCache implements IBufferCacheInternal {
+public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
private static final int MAP_FACTOR = 2;
@@ -45,7 +51,6 @@
private final IIOManager ioManager;
private final int pageSize;
private final int numPages;
- private final CachedPage[] cachedPages;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -53,31 +58,34 @@
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
+ private CachedPage[] cachedPages;
+
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
- IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+ IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
this.maxOpenFiles = maxOpenFiles;
pageReplacementStrategy.setBufferCache(this);
+ pageMap = new CacheBucket[numPages * MAP_FACTOR];
+ for (int i = 0; i < pageMap.length; ++i) {
+ pageMap[i] = new CacheBucket();
+ }
ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
cachedPages = new CachedPage[buffers.length];
for (int i = 0; i < buffers.length; ++i) {
cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
}
- pageMap = new CacheBucket[numPages * MAP_FACTOR];
- for (int i = 0; i < pageMap.length; ++i) {
- pageMap[i] = new CacheBucket();
- }
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
+ Executor executor = Executors.newCachedThreadPool(threadFactory);
fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
cleanerThread = new CleanerThread();
- cleanerThread.start();
+ executor.execute(cleanerThread);
closed = false;
}
@@ -137,7 +145,8 @@
pinSanityCheck(dpid);
CachedPage cPage = findPage(dpid, newPage);
if (!newPage) {
- // Resolve race of multiple threads trying to read the page from disk.
+ // Resolve race of multiple threads trying to read the page from
+ // disk.
synchronized (cPage) {
if (!cPage.valid) {
read(cPage);
@@ -157,7 +166,8 @@
CachedPage cPage = null;
/*
- * Hash dpid to get a bucket and then check if the page exists in the bucket.
+ * Hash dpid to get a bucket and then check if the page exists in
+ * the bucket.
*/
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
@@ -175,29 +185,38 @@
bucket.bucketLock.unlock();
}
/*
- * If we got here, the page was not in the hash table. Now we ask the page replacement strategy to find us a victim.
+ * If we got here, the page was not in the hash table. Now we ask
+ * the page replacement strategy to find us a victim.
*/
CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
if (victim != null) {
/*
- * We have a victim with the following invariants.
- * 1. The dpid on the CachedPage may or may not be valid.
- * 2. We have a pin on the CachedPage. We have to deal with three cases here.
- * Case 1: The dpid on the CachedPage is invalid (-1). This indicates that this buffer has never been used.
- * So we are the only ones holding it. Get a lock on the required dpid's hash bucket, check if someone inserted
- * the page we want into the table. If so, decrement the pincount on the victim and return the winner page in the
- * table. If such a winner does not exist, insert the victim and return it.
- * Case 2: The dpid on the CachedPage is valid.
- * Case 2a: The current dpid and required dpid hash to the same bucket.
- * Get the bucket lock, check that the victim is still at pinCount == 1 If so check if there is a winning
- * CachedPage with the required dpid. If so, decrement the pinCount on the victim and return the winner.
- * If not, update the contents of the CachedPage to hold the required dpid and return it. If the picCount
- * on the victim was != 1 or CachedPage was dirty someone used the victim for its old contents -- Decrement
- * the pinCount and retry.
- * Case 2b: The current dpid and required dpid hash to different buckets. Get the two bucket locks in the order
- * of the bucket indexes (Ordering prevents deadlocks). Check for the existence of a winner in the new bucket
- * and for potential use of the victim (pinCount != 1). If everything looks good, remove the CachedPage from
- * the old bucket, and add it to the new bucket and update its header with the new dpid.
+ * We have a victim with the following invariants. 1. The dpid
+ * on the CachedPage may or may not be valid. 2. We have a pin
+ * on the CachedPage. We have to deal with three cases here.
+ * Case 1: The dpid on the CachedPage is invalid (-1). This
+ * indicates that this buffer has never been used. So we are the
+ * only ones holding it. Get a lock on the required dpid's hash
+ * bucket, check if someone inserted the page we want into the
+ * table. If so, decrement the pincount on the victim and return
+ * the winner page in the table. If such a winner does not
+ * exist, insert the victim and return it. Case 2: The dpid on
+ * the CachedPage is valid. Case 2a: The current dpid and
+ * required dpid hash to the same bucket. Get the bucket lock,
+ * check that the victim is still at pinCount == 1 If so check
+ * if there is a winning CachedPage with the required dpid. If
+ * so, decrement the pinCount on the victim and return the
+ * winner. If not, update the contents of the CachedPage to hold
+ * the required dpid and return it. If the picCount on the
+ * victim was != 1 or CachedPage was dirty someone used the
+ * victim for its old contents -- Decrement the pinCount and
+ * retry. Case 2b: The current dpid and required dpid hash to
+ * different buckets. Get the two bucket locks in the order of
+ * the bucket indexes (Ordering prevents deadlocks). Check for
+ * the existence of a winner in the new bucket and for potential
+ * use of the victim (pinCount != 1). If everything looks good,
+ * remove the CachedPage from the old bucket, and add it to the
+ * new bucket and update its header with the new dpid.
*/
if (victim.dpid < 0) {
/*
@@ -630,7 +649,8 @@
}
fileInfoMap.remove(entryFileId);
unreferencedFileFound = true;
- // for-each iterator is invalid because we changed fileInfoMap
+ // for-each iterator is invalid because we changed
+ // fileInfoMap
break;
}
}
@@ -764,7 +784,7 @@
} finally {
fileMapManager.unregisterFile(fileId);
if (fInfo != null) {
- // Mark the fInfo as deleted,
+ // Mark the fInfo as deleted,
// such that when its pages are reclaimed in openFile(),
// the pages are not flushed to disk but only invalidated.
if (!fInfo.fileHasBeenDeleted()) {
@@ -775,4 +795,17 @@
}
}
}
+
+ @Override
+ public void start() {
+ // no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) throws IOException {
+ if (dumpState) {
+ os.write(dumpState().getBytes());
+ }
+ close();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index 9392ed9..d61b3f1 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -25,7 +25,7 @@
public void closeFile(int fileId) throws HyracksDataException;
public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
-
+
public ICachedPage tryPin(long dpid) throws HyracksDataException;
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
@@ -33,9 +33,9 @@
public void unpin(ICachedPage page) throws HyracksDataException;
public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
-
+
public void force(int fileId, boolean metadata) throws HyracksDataException;
-
+
public int getPageSize();
public int getNumPages();
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 285ab1f..842c0d1 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.test.support;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -70,9 +71,21 @@
return null;
}
- @Override
- public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ThreadFactory getThreadFactory() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index fdb2100..2c6aee5 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -52,6 +53,11 @@
private static int pageSize;
private static int numPages;
private static int maxOpenFiles;
+ private final static ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public static void init(int pageSize, int numPages, int maxOpenFiles) {
TestStorageManagerComponentHolder.pageSize = pageSize;
@@ -76,7 +82,7 @@
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
- (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
+ (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
}
return bufferCache;
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 0f4afd3..2b32545 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
@@ -57,6 +58,11 @@
private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
private final IOManager ioManager;
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
@@ -68,7 +74,8 @@
int numPages = (int) (bufferSize / pageSize);
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
- new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
+ new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
+ threadFactory);
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
lcManager = new IndexLifecycleManager();
localResourceRepository = new TransientLocalResourceRepository();