fix application lifecyle mgmt in hyracks nc
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index ec27653..ad708e9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -35,6 +35,7 @@
private final List<ILifeCycleComponent> components;
private boolean stopInitiated;
+ private boolean stopped;
private String dumpPath;
private boolean configured;
@@ -42,6 +43,7 @@
components = new ArrayList<ILifeCycleComponent>();
stopInitiated = false;
configured = false;
+ stopped = false;
}
@Override
@@ -76,6 +78,12 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.severe("Attempting to stop " + this);
}
+ if (stopped) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.severe("Lifecycle management was already stopped");
+ }
+ return;
+ }
if (stopInitiated) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.severe("Stop already in progress");
@@ -124,7 +132,7 @@
}
}
stopInitiated = false;
-
+ stopped = true;
}
@Override
@@ -142,4 +150,8 @@
configured = true;
}
+ public boolean stoppedAll() {
+ return stopped;
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ebfc486..245d988 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -144,6 +144,8 @@
private final MemoryManager memoryManager;
+ private boolean shuttedDown = false;
+
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
@@ -278,6 +280,9 @@
if (ncAppEntryPoint != null) {
ncAppEntryPoint.notifyStartupComplete();
}
+
+ //add JVM shutdown hook
+ Runtime.getRuntime().addShutdownHook(new JVMShutdownHook(this));
}
private void startApplication() throws Exception {
@@ -294,18 +299,21 @@
}
@Override
- public void stop() throws Exception {
- LOGGER.log(Level.INFO, "Stopping NodeControllerService");
- executor.shutdownNow();
- partitionManager.close();
- datasetPartitionManager.close();
- heartbeatTask.cancel();
- netManager.stop();
- datasetNetworkManager.stop();
- queue.stop();
- if (ncAppEntryPoint != null)
- ncAppEntryPoint.stop();
- LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ public synchronized void stop() throws Exception {
+ if (!shuttedDown) {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ executor.shutdownNow();
+ partitionManager.close();
+ datasetPartitionManager.close();
+ heartbeatTask.cancel();
+ netManager.stop();
+ datasetNetworkManager.stop();
+ queue.stop();
+ if (ncAppEntryPoint != null)
+ ncAppEntryPoint.stop();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ shuttedDown = true;
+ }
}
public String getId() {
@@ -527,4 +535,29 @@
public IDatasetPartitionManager getDatasetPartitionManager() {
return datasetPartitionManager;
}
-}
\ No newline at end of file
+
+ /**
+ * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
+ */
+ private static class JVMShutdownHook extends Thread {
+
+ private final NodeControllerService nodeControllerService;
+
+ public JVMShutdownHook(NodeControllerService ncAppEntryPoint) {
+ this.nodeControllerService = ncAppEntryPoint;
+ }
+
+ public void run() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shutdown hook in progress");
+ }
+ try {
+ nodeControllerService.stop();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in executing shutdown hook" + e);
+ }
+ }
+ }
+ }
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index a1e866d..b5a2927 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -154,7 +154,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -212,7 +212,7 @@
writeSearchResults(accessor, i);
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -239,6 +239,11 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -246,6 +251,9 @@
} finally {
treeIndexOpHelper.close();
}
+ }
+
+ private void populateFailure() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index dd20e82..2a7fede 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -185,7 +185,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -213,7 +213,7 @@
}
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -263,6 +263,11 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -270,6 +275,9 @@
} finally {
treeIndexOpHelper.close();
}
+ }
+
+ private void populateFailure() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6549782..fe27029 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -148,7 +148,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -181,6 +181,7 @@
}
}
} catch (Exception e) {
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -227,6 +228,17 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void populateFailure() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -234,9 +246,6 @@
} finally {
treeIndexOpHelper.close();
}
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
}
/** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index 9e05bc8..f955831 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -166,7 +166,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -206,7 +206,7 @@
writeSearchResults();
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -233,6 +233,17 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void populateFailure() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -240,9 +251,6 @@
} finally {
treeIndexHelper.close();
}
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
}
@Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 854e3dc..98219d6 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -48,6 +48,7 @@
public class RuntimeContext implements IWorkspaceFileFactory {
+ private final static int SHUTDOWN_GRACEFUL_PERIOD = 5000;
private final IIndexLifecycleManager lcManager;
private final ILocalResourceRepository localResourceRepository;
private final ResourceIdFactory resourceIdFactory;
@@ -87,11 +88,17 @@
}
public synchronized void close() throws HyracksDataException {
- bufferCache.close();
for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
entry.getValue().close();
}
activeJobs.clear();
+ // wait a graceful period until all active operators using tree cursors are dead
+ try {
+ wait(SHUTDOWN_GRACEFUL_PERIOD);
+ } catch (InterruptedException e) {
+
+ }
+ bufferCache.close();
}
public ILocalResourceRepository getLocalResourceRepository() {