Refactored internal operator environment object
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1088 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 2532e2a..d2a7977 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -25,7 +25,6 @@
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
@@ -64,7 +63,7 @@
private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
- private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
+ private final IOperatorEnvironment env;
private final Map<TaskId, ITaskState> taskStateMap;
@@ -88,7 +87,7 @@
this.jobId = jobId;
this.jag = jag;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
- envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ env = new OperatorEnvironmentImpl(nodeController.getId());
taskStateMap = new HashMap<TaskId, ITaskState>();
taskMap = new HashMap<TaskAttemptId, Task>();
counterMap = new HashMap<String, Counter>();
@@ -106,15 +105,8 @@
return jag;
}
- public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
- if (!envMap.containsKey(opId)) {
- envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
- }
- Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(opId);
- if (!opEnvMap.containsKey(partition)) {
- opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
- }
- return opEnvMap.get(partition);
+ public IOperatorEnvironment getEnvironment() {
+ return env;
}
public void addTask(Task task) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index daeb4d8..8761c2f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -93,8 +93,7 @@
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
- opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
- .getPartition());
+ opEnv = joblet.getEnvironment();
partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
pendingThreads = new LinkedHashSet<Thread>();
failed = false;