cleanup pregelix job context
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
index 0ddb7d5..ceb085c 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
@@ -18,8 +18,8 @@
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
private final Map<TaskIterationID, IStateObject> appStateMap = new ConcurrentHashMap<TaskIterationID, IStateObject>();
- private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private Long jobIdToSuperStep;
+ private Boolean jobIdToMove;
public void close() throws HyracksDataException {
for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
@@ -30,15 +30,13 @@
appStateMap.clear();
}
- public void clearState(String jobId) throws HyracksDataException {
+ public void clearState() throws HyracksDataException {
for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
for (FileReference fileRef : entry.getValue())
fileRef.delete();
iterationToFiles.clear();
appStateMap.clear();
- jobIdToMove.remove(jobId);
- jobIdToSuperStep.remove(jobId);
}
public Map<TaskIterationID, IStateObject> getAppStateStore() {
@@ -49,51 +47,49 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
- Boolean toMove = jobIdToMove.get(jobId);
- if (toMove == null || toMove == true) {
- if (jobIdToSuperStep.get(jobId) == null) {
+ public void setVertexProperties(long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+ if (jobIdToMove == null || jobIdToMove == true) {
+ if (jobIdToSuperStep == null) {
if (currentIteration <= 0) {
- jobIdToSuperStep.put(jobId, 0L);
+ jobIdToSuperStep = 0L;
} else {
- jobIdToSuperStep.put(jobId, currentIteration);
+ jobIdToSuperStep = currentIteration;
}
}
- long superStep = jobIdToSuperStep.get(jobId);
+ long superStep = jobIdToSuperStep;
List<FileReference> files = iterationToFiles.remove(superStep - 1);
if (files != null) {
for (FileReference fileRef : files)
fileRef.delete();
}
- setProperties(jobId, numVertices, numEdges, currentIteration, superStep, false, cl);
+ setProperties(numVertices, numEdges, currentIteration, superStep, false, cl);
}
System.gc();
}
- public void recoverVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
- ClassLoader cl) {
- if (jobIdToSuperStep.get(jobId) == null) {
+ public void recoverVertexProperties(long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+ if (jobIdToSuperStep == null) {
if (currentIteration <= 0) {
- jobIdToSuperStep.put(jobId, 0L);
+ jobIdToSuperStep = 0L;
} else {
- jobIdToSuperStep.put(jobId, currentIteration);
+ jobIdToSuperStep = currentIteration;
}
}
- long superStep = jobIdToSuperStep.get(jobId);
+ long superStep = jobIdToSuperStep;
List<FileReference> files = iterationToFiles.remove(superStep - 1);
if (files != null) {
for (FileReference fileRef : files)
fileRef.delete();
}
- setProperties(jobId, numVertices, numEdges, currentIteration, superStep, true, cl);
+ setProperties(numVertices, numEdges, currentIteration, superStep, true, cl);
}
- public void endSuperStep(String pregelixJobId) {
- jobIdToMove.put(pregelixJobId, true);
+ public void endSuperStep() {
+ jobIdToMove = true;
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
@@ -101,8 +97,8 @@
return iterationToFiles;
}
- private void setProperties(String jobId, long numVertices, long numEdges, long currentIteration, long superStep,
- boolean toMove, ClassLoader cl) {
+ private void setProperties(long numVertices, long numEdges, long currentIteration, long superStep, boolean toMove,
+ ClassLoader cl) {
try {
Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
Method superStepMethod = vClass.getMethod("setSuperstep", Long.TYPE);
@@ -120,8 +116,8 @@
numVerticesMethod.invoke(null, numVertices);
//Vertex.setNumEdges(numEdges);
numEdgesMethod.invoke(null, numEdges);
- jobIdToSuperStep.put(jobId, superStep);
- jobIdToMove.put(jobId, toMove);
+ jobIdToSuperStep = superStep;
+ jobIdToMove = toMove;
LOGGER.info("start iteration " + Vertex.getSuperstep());
} catch (Exception e) {
throw new IllegalStateException(e);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index e5f74f3..854e3dc 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -130,23 +130,23 @@
public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
ClassLoader cl) {
PJobContext activeJob = getActiveJob(jobId);
- activeJob.setVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
+ activeJob.setVertexProperties(numVertices, numEdges, currentIteration, cl);
}
public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
long currentIteration, ClassLoader cl) {
PJobContext activeJob = getActiveJob(jobId);
- activeJob.recoverVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
+ activeJob.recoverVertexProperties(numVertices, numEdges, currentIteration, cl);
}
public synchronized void endSuperStep(String jobId) {
PJobContext activeJob = getActiveJob(jobId);
- activeJob.endSuperStep(jobId);
+ activeJob.endSuperStep();
}
public synchronized void clearState(String jobId) throws HyracksDataException {
PJobContext activeJob = getActiveJob(jobId);
- activeJob.clearState(jobId);
+ activeJob.clearState();
activeJobs.remove(jobId);
}