Refactored internal operator environment object

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1088 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 2532e2a..d2a7977 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -25,7 +25,6 @@
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
@@ -64,7 +63,7 @@
 
     private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
 
-    private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
+    private final IOperatorEnvironment env;
 
     private final Map<TaskId, ITaskState> taskStateMap;
 
@@ -88,7 +87,7 @@
         this.jobId = jobId;
         this.jag = jag;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
-        envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+        env = new OperatorEnvironmentImpl(nodeController.getId());
         taskStateMap = new HashMap<TaskId, ITaskState>();
         taskMap = new HashMap<TaskAttemptId, Task>();
         counterMap = new HashMap<String, Counter>();
@@ -106,15 +105,8 @@
         return jag;
     }
 
-    public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
-        if (!envMap.containsKey(opId)) {
-            envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
-        }
-        Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(opId);
-        if (!opEnvMap.containsKey(partition)) {
-            opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
-        }
-        return opEnvMap.get(partition);
+    public IOperatorEnvironment getEnvironment() {
+        return env;
     }
 
     public void addTask(Task task) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index daeb4d8..8761c2f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -93,8 +93,7 @@
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
-        opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
-                .getPartition());
+        opEnv = joblet.getEnvironment();
         partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
         pendingThreads = new LinkedHashSet<Thread>();
         failed = false;