Cleaned up some synchronization
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@907 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index bcc3c30..935ce88 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -36,7 +36,6 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -44,6 +43,7 @@
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
@@ -174,6 +174,14 @@
LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
}
+ public ServerContext getServerContext() {
+ return serverCtx;
+ }
+
+ public ICCContext getCCContext() {
+ return ccContext;
+ }
+
public Map<String, CCApplicationContext> getApplicationMap() {
return applications;
}
@@ -225,9 +233,9 @@
String id = reg.getNodeId();
NodeControllerState state = new NodeControllerState(nodeController, reg);
workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
- nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
+ params.setClusterController(this);
params.setClusterControllerInfo(info);
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
@@ -237,8 +245,7 @@
@Override
public void unregisterNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
- workQueue.scheduleAndSync(new UnregisterNodeWork(this, id));
- LOGGER.log(Level.INFO, "Unregistered INodeController");
+ workQueue.schedule(new UnregisterNodeWork(this, id));
}
@Override
@@ -295,13 +302,9 @@
@Override
public void createApplication(String appName) throws Exception {
- synchronized (applications) {
- if (applications.containsKey(appName)) {
- throw new HyracksException("Duplicate application with name: " + appName + " being created.");
- }
- CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
- applications.put(appName, appCtx);
- }
+ FutureValue<Object> fv = new FutureValue<Object>();
+ workQueue.schedule(new ApplicationCreateWork(this, appName, fv));
+ fv.get();
}
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
new file mode 100644
index 0000000..3c8f7d0
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class ApplicationCreateWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+ private final String appName;
+ private FutureValue<Object> fv;
+
+ public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+ this.ccs = ccs;
+ this.appName = appName;
+ this.fv = fv;
+ }
+
+ @Override
+ public void run() {
+ Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+ if (applications.containsKey(appName)) {
+ fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
+ }
+ CCApplicationContext appCtx;
+ try {
+ appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+ } catch (IOException e) {
+ fv.setException(e);
+ return;
+ }
+ applications.put(appName, appCtx);
+ fv.setValue(null);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index fdf49e9..cca9d81 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -40,8 +40,6 @@
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
- public void notifyRegistration(IClusterController ccs) throws Exception;
-
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
index 0161f96..d67b777 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -17,16 +17,27 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
public class NodeParameters implements Serializable {
private static final long serialVersionUID = 1L;
+ private IClusterController cc;
+
private ClusterControllerInfo ccInfo;
private int heartbeatPeriod;
private int profileDumpPeriod;
+ public IClusterController getClusterController() {
+ return cc;
+ }
+
+ public void setClusterController(IClusterController cc) {
+ this.cc = cc;
+ }
+
public ClusterControllerInfo getClusterControllerInfo() {
return ccInfo;
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f5ae1f4..36f0c49 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -16,6 +16,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -27,15 +29,21 @@
private final WorkerThread thread;
private final Semaphore stopSemaphore;
private boolean stopped;
+ private final AtomicInteger enqueueCount;
+ private final AtomicInteger dequeueCount;
public WorkQueue() {
queue = new LinkedBlockingQueue<AbstractWork>();
thread = new WorkerThread();
stopSemaphore = new Semaphore(1);
+ enqueueCount = new AtomicInteger();
+ dequeueCount = new AtomicInteger();
}
public void start() throws HyracksException {
stopped = false;
+ enqueueCount.set(0);
+ dequeueCount.set(0);
try {
stopSemaphore.acquire();
} catch (InterruptedException e) {
@@ -61,6 +69,10 @@
}
public void schedule(AbstractWork event) {
+ enqueueCount.incrementAndGet();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Enqueue: " + enqueueCount);
+ }
if (LOGGER.isLoggable(event.logLevel())) {
LOGGER.log(event.logLevel(), "Scheduling: " + event);
}
@@ -80,7 +92,7 @@
@Override
public void run() {
try {
- Runnable r;
+ AbstractWork r;
while (true) {
synchronized (WorkQueue.this) {
if (stopped) {
@@ -92,7 +104,14 @@
} catch (InterruptedException e) {
continue;
}
+ dequeueCount.incrementAndGet();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Dequeue: " + dequeueCount + "/" + enqueueCount);
+ }
try {
+ if (LOGGER.isLoggable(r.logLevel())) {
+ LOGGER.log(r.logLevel(), "Executing: " + r);
+ }
r.run();
} catch (Exception e) {
e.printStackTrace();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3fd10a8..8027af5 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -171,6 +171,7 @@
this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
.getAvailableProcessors(), hbSchema));
+ this.ccs = nodeParameters.getClusterController();
queue.start();
heartbeatTask = new HeartbeatTask(cc);
@@ -238,18 +239,13 @@
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
- queue.scheduleAndSync(stw);
+ queue.schedule(stw);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
- queue.scheduleAndSync(cjw);
- }
-
- @Override
- public void notifyRegistration(IClusterController ccs) throws Exception {
- this.ccs = ccs;
+ queue.schedule(cjw);
}
@Override
@@ -260,20 +256,24 @@
@Override
public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
- queue.scheduleAndSync(atw);
+ queue.schedule(atw);
}
@Override
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception {
- CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState);
- queue.scheduleAndSync(caw);
+ FutureValue<Object> fv = new FutureValue<Object>();
+ CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState, fv);
+ queue.schedule(caw);
+ fv.get();
}
@Override
public void destroyApplication(String appName) throws Exception {
- DestroyApplicationWork daw = new DestroyApplicationWork(this, appName);
- queue.scheduleAndSync(daw);
+ FutureValue<Object> fv = new FutureValue<Object>();
+ DestroyApplicationWork daw = new DestroyApplicationWork(this, appName, fv);
+ queue.schedule(daw);
+ fv.get();
}
@Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index dce7c13..7bebb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,6 +50,15 @@
if (joblet != null) {
joblet.cleanup(status);
}
- ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+ ncs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 120e376..5b60a08 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -44,41 +45,49 @@
private final byte[] serializedDistributedState;
+ private final FutureValue<Object> fv;
+
public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
- byte[] serializedDistributedState) {
+ byte[] serializedDistributedState, FutureValue<Object> fv) {
this.ncs = ncs;
this.appName = appName;
this.deployHar = deployHar;
this.serializedDistributedState = serializedDistributedState;
+ this.fv = fv;
}
@Override
protected void doRun() throws Exception {
- NCApplicationContext appCtx;
- Map<String, NCApplicationContext> applications = ncs.getApplications();
- if (applications.containsKey(appName)) {
- throw new HyracksException("Duplicate application with name: " + appName + " being created.");
- }
- appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
- applications.put(appName, appCtx);
- if (deployHar) {
- NCConfig ncConfig = ncs.getConfiguration();
- NodeParameters nodeParameters = ncs.getNodeParameters();
- HttpClient hc = new DefaultHttpClient();
- HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
- + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
- HttpResponse response = hc.execute(get);
- InputStream is = response.getEntity().getContent();
- OutputStream os = appCtx.getHarOutputStream();
- try {
- IOUtils.copyLarge(is, os);
- } finally {
- os.close();
- is.close();
+ try {
+ NCApplicationContext appCtx;
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ if (applications.containsKey(appName)) {
+ throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
+ appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+ applications.put(appName, appCtx);
+ if (deployHar) {
+ NCConfig ncConfig = ncs.getConfiguration();
+ NodeParameters nodeParameters = ncs.getNodeParameters();
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+ + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = appCtx.getHarOutputStream();
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ appCtx.initializeClassPath();
+ appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+ appCtx.initialize();
+ fv.setValue(null);
+ } catch (Exception e) {
+ fv.setException(e);
}
- appCtx.initializeClassPath();
- appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
- appCtx.initialize();
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index 6a5fbfc..8ac0b5c 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -18,6 +18,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -29,17 +30,25 @@
private final String appName;
- public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+ private FutureValue<Object> fv;
+
+ public DestroyApplicationWork(NodeControllerService ncs, String appName, FutureValue<Object> fv) {
this.ncs = ncs;
this.appName = appName;
+ this.fv = fv;
}
@Override
protected void doRun() throws Exception {
- Map<String, NCApplicationContext> applications = ncs.getApplications();
- ApplicationContext appCtx = applications.remove(appName);
- if (appCtx != null) {
- appCtx.deinitialize();
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ ApplicationContext appCtx = applications.remove(appName);
+ if (appCtx != null) {
+ appCtx.deinitialize();
+ }
+ fv.setValue(null);
+ } catch (Exception e) {
+ fv.setException(e);
}
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 26bf75b..7769539 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -52,7 +52,7 @@
int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
int fh = hashFn
.hash(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd - fStart);
- h += fh;
+ h = h * 31 + fh;
}
if (h < 0) {
h = -h;