Added INullWriter* classes. Reduced synchronization between Joblet and Stagelet
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@407 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+ public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+ public INullWriter createNullWriter();
+}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
this.appCtx = appCtx;
this.jobId = jobId;
this.attempt = attempt;
- stageletMap = new HashMap<UUID, Stagelet>();
- envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
- counterMap = new HashMap<String, Counter>();
+ stageletMap = new Hashtable<UUID, Stagelet>();
+ envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
}
@@ -78,8 +79,10 @@
}
public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
- if (!envMap.containsKey(hod.getOperatorId())) {
- envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ synchronized (envMap) {
+ if (!envMap.containsKey(hod.getOperatorId())) {
+ envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ }
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+ public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
@@ -132,15 +135,19 @@
return nodeController;
}
- public synchronized void dumpProfile(JobletProfile jProfile) {
- Map<String, Long> counters = jProfile.getCounters();
- for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
- counters.put(e.getKey(), e.getValue().get());
+ public void dumpProfile(JobletProfile jProfile) {
+ synchronized (counterMap) {
+ Map<String, Long> counters = jProfile.getCounters();
+ for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+ counters.put(e.getKey(), e.getValue().get());
+ }
}
- for (Stagelet si : stageletMap.values()) {
- StageletProfile sProfile = new StageletProfile(si.getStageId());
- si.dumpProfile(sProfile);
- jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ synchronized (stageletMap) {
+ for (Stagelet si : stageletMap.values()) {
+ StageletProfile sProfile = new StageletProfile(si.getStageId());
+ si.dumpProfile(sProfile);
+ jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ }
}
}
@@ -193,12 +200,14 @@
}
@Override
- public synchronized ICounter getCounter(String name, boolean create) {
- Counter counter = counterMap.get(name);
- if (counter == null && create) {
- counter = new Counter(name);
- counterMap.put(name, counter);
+ public ICounter getCounter(String name, boolean create) {
+ synchronized (counterMap) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
}
- return counter;
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..4a1c78a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
}
nodeCapability = computeNodeCapability();
connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
- jobletMap = new HashMap<UUID, Joblet>();
+ jobletMap = new Hashtable<UUID, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
@@ -438,19 +438,20 @@
si.setEndpointList(null);
}
- private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+ private Joblet getLocalJoblet(UUID jobId) throws Exception {
Joblet ji = jobletMap.get(jobId);
return ji;
}
- private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
- throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji == null || ji.getAttempt() != attempt) {
- ji = new Joblet(this, jobId, attempt, appCtx);
- jobletMap.put(jobId, ji);
+ private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+ synchronized (jobletMap) {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null || ji.getAttempt() != attempt) {
+ ji = new Joblet(this, jobId, attempt, appCtx);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
}
- return ji;
}
public Executor getExecutor() {
@@ -458,7 +459,7 @@
}
@Override
- public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(UUID jobId) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
@@ -563,7 +564,7 @@
}
@Override
- public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+ public void abortJoblet(UUID jobId, int attempt) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..594fcae 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
});
}
- protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
- pendingOperators.remove(opIId);
- if (pendingOperators.isEmpty()) {
+ protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+ boolean done = false;
+ synchronized (pendingOperators) {
+ pendingOperators.remove(opIId);
+ done = pendingOperators.isEmpty();
+ }
+ if (done) {
try {
StageletProfile sProfile = new StageletProfile(stageId);
dumpProfile(sProfile);