fix application lifecyle mgmt in hyracks nc
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index a1e866d..b5a2927 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -154,7 +154,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -212,7 +212,7 @@
writeSearchResults(accessor, i);
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -239,6 +239,11 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -246,6 +251,9 @@
} finally {
treeIndexOpHelper.close();
}
+ }
+
+ private void populateFailure() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.fail();
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index dd20e82..2a7fede 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -185,7 +185,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -213,7 +213,7 @@
}
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -263,6 +263,11 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -270,6 +275,9 @@
} finally {
treeIndexOpHelper.close();
}
+ }
+
+ private void populateFailure() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.fail();
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6549782..fe27029 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -148,7 +148,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.close();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -181,6 +181,7 @@
}
}
} catch (Exception e) {
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -227,6 +228,17 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void populateFailure() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -234,9 +246,6 @@
} finally {
treeIndexOpHelper.close();
}
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
}
/** compare tuples */
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index 9e05bc8..f955831 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -166,7 +166,7 @@
cloneUpdateTb = new ArrayTupleBuilder(index.getFieldCount());
updateBuffer.setFieldCount(index.getFieldCount());
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -206,7 +206,7 @@
writeSearchResults();
}
} catch (Exception e) {
- fail();
+ closeResource();
throw new HyracksDataException(e);
}
}
@@ -233,6 +233,17 @@
@Override
public void fail() throws HyracksDataException {
+ closeResource();
+ populateFailure();
+ }
+
+ private void populateFailure() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ private void closeResource() throws HyracksDataException {
try {
cursor.close();
} catch (Exception e) {
@@ -240,9 +251,6 @@
} finally {
treeIndexHelper.close();
}
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
}
@Override
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 854e3dc..98219d6 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -48,6 +48,7 @@
public class RuntimeContext implements IWorkspaceFileFactory {
+ private final static int SHUTDOWN_GRACEFUL_PERIOD = 5000;
private final IIndexLifecycleManager lcManager;
private final ILocalResourceRepository localResourceRepository;
private final ResourceIdFactory resourceIdFactory;
@@ -87,11 +88,17 @@
}
public synchronized void close() throws HyracksDataException {
- bufferCache.close();
for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
entry.getValue().close();
}
activeJobs.clear();
+ // wait a graceful period until all active operators using tree cursors are dead
+ try {
+ wait(SHUTDOWN_GRACEFUL_PERIOD);
+ } catch (InterruptedException e) {
+
+ }
+ bufferCache.close();
}
public ILocalResourceRepository getLocalResourceRepository() {