Added INullWriter* classes. Reduced synchronization between Joblet and Stagelet

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@407 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+    public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+    public INullWriter createNullWriter();
+}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
         this.appCtx = appCtx;
         this.jobId = jobId;
         this.attempt = attempt;
-        stageletMap = new HashMap<UUID, Stagelet>();
-        envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
-        counterMap = new HashMap<String, Counter>();
+        stageletMap = new Hashtable<UUID, Stagelet>();
+        envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+        counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
     }
@@ -78,8 +79,10 @@
     }
 
     public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
-        if (!envMap.containsKey(hod.getOperatorId())) {
-            envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+        synchronized (envMap) {
+            if (!envMap.containsKey(hod.getOperatorId())) {
+                envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+            }
         }
         Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
         if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+    public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
         stageletMap.remove(stageId);
         nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
     }
@@ -132,15 +135,19 @@
         return nodeController;
     }
 
-    public synchronized void dumpProfile(JobletProfile jProfile) {
-        Map<String, Long> counters = jProfile.getCounters();
-        for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
-            counters.put(e.getKey(), e.getValue().get());
+    public void dumpProfile(JobletProfile jProfile) {
+        synchronized (counterMap) {
+            Map<String, Long> counters = jProfile.getCounters();
+            for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+                counters.put(e.getKey(), e.getValue().get());
+            }
         }
-        for (Stagelet si : stageletMap.values()) {
-            StageletProfile sProfile = new StageletProfile(si.getStageId());
-            si.dumpProfile(sProfile);
-            jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+        synchronized (stageletMap) {
+            for (Stagelet si : stageletMap.values()) {
+                StageletProfile sProfile = new StageletProfile(si.getStageId());
+                si.dumpProfile(sProfile);
+                jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+            }
         }
     }
 
@@ -193,12 +200,14 @@
     }
 
     @Override
-    public synchronized ICounter getCounter(String name, boolean create) {
-        Counter counter = counterMap.get(name);
-        if (counter == null && create) {
-            counter = new Counter(name);
-            counterMap.put(name, counter);
+    public ICounter getCounter(String name, boolean create) {
+        synchronized (counterMap) {
+            Counter counter = counterMap.get(name);
+            if (counter == null && create) {
+                counter = new Counter(name);
+                counterMap.put(name, counter);
+            }
+            return counter;
         }
-        return counter;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..4a1c78a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
         }
         nodeCapability = computeNodeCapability();
         connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
-        jobletMap = new HashMap<UUID, Joblet>();
+        jobletMap = new Hashtable<UUID, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
                 NodeControllerService.class.getName()), id));
@@ -438,19 +438,20 @@
         si.setEndpointList(null);
     }
 
-    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+    private Joblet getLocalJoblet(UUID jobId) throws Exception {
         Joblet ji = jobletMap.get(jobId);
         return ji;
     }
 
-    private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
-            throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null || ji.getAttempt() != attempt) {
-            ji = new Joblet(this, jobId, attempt, appCtx);
-            jobletMap.put(jobId, ji);
+    private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+        synchronized (jobletMap) {
+            Joblet ji = jobletMap.get(jobId);
+            if (ji == null || ji.getAttempt() != attempt) {
+                ji = new Joblet(this, jobId, attempt, appCtx);
+                jobletMap.put(jobId, ji);
+            }
+            return ji;
         }
-        return ji;
     }
 
     public Executor getExecutor() {
@@ -458,7 +459,7 @@
     }
 
     @Override
-    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+    public void cleanUpJob(UUID jobId) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
@@ -563,7 +564,7 @@
     }
 
     @Override
-    public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+    public void abortJoblet(UUID jobId, int attempt) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..594fcae 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
         });
     }
 
-    protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
-        pendingOperators.remove(opIId);
-        if (pendingOperators.isEmpty()) {
+    protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+        boolean done = false;
+        synchronized (pendingOperators) {
+            pendingOperators.remove(opIId);
+            done = pendingOperators.isEmpty();
+        }
+        if (done) {
             try {
                 StageletProfile sProfile = new StageletProfile(stageId);
                 dumpProfile(sProfile);