Cleaned up some synchronization

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@907 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index bcc3c30..935ce88 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -36,7 +36,6 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -44,6 +43,7 @@
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
@@ -174,6 +174,14 @@
         LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
     }
 
+    public ServerContext getServerContext() {
+        return serverCtx;
+    }
+
+    public ICCContext getCCContext() {
+        return ccContext;
+    }
+
     public Map<String, CCApplicationContext> getApplicationMap() {
         return applications;
     }
@@ -225,9 +233,9 @@
         String id = reg.getNodeId();
         NodeControllerState state = new NodeControllerState(nodeController, reg);
         workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
-        nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
         NodeParameters params = new NodeParameters();
+        params.setClusterController(this);
         params.setClusterControllerInfo(info);
         params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
         params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
@@ -237,8 +245,7 @@
     @Override
     public void unregisterNode(INodeController nodeController) throws Exception {
         String id = nodeController.getId();
-        workQueue.scheduleAndSync(new UnregisterNodeWork(this, id));
-        LOGGER.log(Level.INFO, "Unregistered INodeController");
+        workQueue.schedule(new UnregisterNodeWork(this, id));
     }
 
     @Override
@@ -295,13 +302,9 @@
 
     @Override
     public void createApplication(String appName) throws Exception {
-        synchronized (applications) {
-            if (applications.containsKey(appName)) {
-                throw new HyracksException("Duplicate application with name: " + appName + " being created.");
-            }
-            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
-            applications.put(appName, appCtx);
-        }
+        FutureValue<Object> fv = new FutureValue<Object>();
+        workQueue.schedule(new ApplicationCreateWork(this, appName, fv));
+        fv.get();
     }
 
     @Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
new file mode 100644
index 0000000..3c8f7d0
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class ApplicationCreateWork extends AbstractWork {
+    private final ClusterControllerService ccs;
+    private final String appName;
+    private FutureValue<Object> fv;
+
+    public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+        this.ccs = ccs;
+        this.appName = appName;
+        this.fv = fv;
+    }
+
+    @Override
+    public void run() {
+        Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+        if (applications.containsKey(appName)) {
+            fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
+        }
+        CCApplicationContext appCtx;
+        try {
+            appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+        } catch (IOException e) {
+            fv.setException(e);
+            return;
+        }
+        applications.put(appName, appCtx);
+        fv.setValue(null);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index fdf49e9..cca9d81 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -40,8 +40,6 @@
 
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
 
-    public void notifyRegistration(IClusterController ccs) throws Exception;
-
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
index 0161f96..d67b777 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -17,16 +17,27 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
 
 public class NodeParameters implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private IClusterController cc;
+
     private ClusterControllerInfo ccInfo;
 
     private int heartbeatPeriod;
 
     private int profileDumpPeriod;
 
+    public IClusterController getClusterController() {
+        return cc;
+    }
+
+    public void setClusterController(IClusterController cc) {
+        this.cc = cc;
+    }
+
     public ClusterControllerInfo getClusterControllerInfo() {
         return ccInfo;
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f5ae1f4..36f0c49 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -16,6 +16,8 @@
 
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -27,15 +29,21 @@
     private final WorkerThread thread;
     private final Semaphore stopSemaphore;
     private boolean stopped;
+    private final AtomicInteger enqueueCount;
+    private final AtomicInteger dequeueCount;
 
     public WorkQueue() {
         queue = new LinkedBlockingQueue<AbstractWork>();
         thread = new WorkerThread();
         stopSemaphore = new Semaphore(1);
+        enqueueCount = new AtomicInteger();
+        dequeueCount = new AtomicInteger();
     }
 
     public void start() throws HyracksException {
         stopped = false;
+        enqueueCount.set(0);
+        dequeueCount.set(0);
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
@@ -61,6 +69,10 @@
     }
 
     public void schedule(AbstractWork event) {
+        enqueueCount.incrementAndGet();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Enqueue: " + enqueueCount);
+        }
         if (LOGGER.isLoggable(event.logLevel())) {
             LOGGER.log(event.logLevel(), "Scheduling: " + event);
         }
@@ -80,7 +92,7 @@
         @Override
         public void run() {
             try {
-                Runnable r;
+                AbstractWork r;
                 while (true) {
                     synchronized (WorkQueue.this) {
                         if (stopped) {
@@ -92,7 +104,14 @@
                     } catch (InterruptedException e) {
                         continue;
                     }
+                    dequeueCount.incrementAndGet();
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Dequeue: " + dequeueCount + "/" + enqueueCount);
+                    }
                     try {
+                        if (LOGGER.isLoggable(r.logLevel())) {
+                            LOGGER.log(r.logLevel(), "Executing: " + r);
+                        }
                         r.run();
                     } catch (Exception e) {
                         e.printStackTrace();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3fd10a8..8027af5 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -171,6 +171,7 @@
         this.nodeParameters = cc.registerNode(new NodeRegistration(this, id, ncConfig, connectionManager
                 .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
                 .getAvailableProcessors(), hbSchema));
+        this.ccs = nodeParameters.getClusterController();
         queue.start();
 
         heartbeatTask = new HeartbeatTask(cc);
@@ -238,18 +239,13 @@
             List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
         StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
-        queue.scheduleAndSync(stw);
+        queue.schedule(stw);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
         CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
-        queue.scheduleAndSync(cjw);
-    }
-
-    @Override
-    public void notifyRegistration(IClusterController ccs) throws Exception {
-        this.ccs = ccs;
+        queue.schedule(cjw);
     }
 
     @Override
@@ -260,20 +256,24 @@
     @Override
     public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
         AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
-        queue.scheduleAndSync(atw);
+        queue.schedule(atw);
     }
 
     @Override
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception {
-        CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState);
-        queue.scheduleAndSync(caw);
+        FutureValue<Object> fv = new FutureValue<Object>();
+        CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState, fv);
+        queue.schedule(caw);
+        fv.get();
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        DestroyApplicationWork daw = new DestroyApplicationWork(this, appName);
-        queue.scheduleAndSync(daw);
+        FutureValue<Object> fv = new FutureValue<Object>();
+        DestroyApplicationWork daw = new DestroyApplicationWork(this, appName, fv);
+        queue.schedule(daw);
+        fv.get();
     }
 
     @Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index dce7c13..7bebb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,6 +50,15 @@
         if (joblet != null) {
             joblet.cleanup(status);
         }
-        ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+        ncs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 120e376..5b60a08 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -44,41 +45,49 @@
 
     private final byte[] serializedDistributedState;
 
+    private final FutureValue<Object> fv;
+
     public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
-            byte[] serializedDistributedState) {
+            byte[] serializedDistributedState, FutureValue<Object> fv) {
         this.ncs = ncs;
         this.appName = appName;
         this.deployHar = deployHar;
         this.serializedDistributedState = serializedDistributedState;
+        this.fv = fv;
     }
 
     @Override
     protected void doRun() throws Exception {
-        NCApplicationContext appCtx;
-        Map<String, NCApplicationContext> applications = ncs.getApplications();
-        if (applications.containsKey(appName)) {
-            throw new HyracksException("Duplicate application with name: " + appName + " being created.");
-        }
-        appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
-        applications.put(appName, appCtx);
-        if (deployHar) {
-            NCConfig ncConfig = ncs.getConfiguration();
-            NodeParameters nodeParameters = ncs.getNodeParameters();
-            HttpClient hc = new DefaultHttpClient();
-            HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
-                    + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
-            HttpResponse response = hc.execute(get);
-            InputStream is = response.getEntity().getContent();
-            OutputStream os = appCtx.getHarOutputStream();
-            try {
-                IOUtils.copyLarge(is, os);
-            } finally {
-                os.close();
-                is.close();
+        try {
+            NCApplicationContext appCtx;
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            if (applications.containsKey(appName)) {
+                throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
+            appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+            applications.put(appName, appCtx);
+            if (deployHar) {
+                NCConfig ncConfig = ncs.getConfiguration();
+                NodeParameters nodeParameters = ncs.getNodeParameters();
+                HttpClient hc = new DefaultHttpClient();
+                HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+                        + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+                HttpResponse response = hc.execute(get);
+                InputStream is = response.getEntity().getContent();
+                OutputStream os = appCtx.getHarOutputStream();
+                try {
+                    IOUtils.copyLarge(is, os);
+                } finally {
+                    os.close();
+                    is.close();
+                }
+            }
+            appCtx.initializeClassPath();
+            appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+            appCtx.initialize();
+            fv.setValue(null);
+        } catch (Exception e) {
+            fv.setException(e);
         }
-        appCtx.initializeClassPath();
-        appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
-        appCtx.initialize();
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index 6a5fbfc..8ac0b5c 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -18,6 +18,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -29,17 +30,25 @@
 
     private final String appName;
 
-    public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+    private FutureValue<Object> fv;
+
+    public DestroyApplicationWork(NodeControllerService ncs, String appName, FutureValue<Object> fv) {
         this.ncs = ncs;
         this.appName = appName;
+        this.fv = fv;
     }
 
     @Override
     protected void doRun() throws Exception {
-        Map<String, NCApplicationContext> applications = ncs.getApplications();
-        ApplicationContext appCtx = applications.remove(appName);
-        if (appCtx != null) {
-            appCtx.deinitialize();
+        try {
+            Map<String, NCApplicationContext> applications = ncs.getApplications();
+            ApplicationContext appCtx = applications.remove(appName);
+            if (appCtx != null) {
+                appCtx.deinitialize();
+            }
+            fv.setValue(null);
+        } catch (Exception e) {
+            fv.setException(e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 26bf75b..7769539 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -52,7 +52,7 @@
                     int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
                     int fh = hashFn
                             .hash(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd - fStart);
-                    h += fh;
+                    h = h * 31 + fh;
                 }
                 if (h < 0) {
                     h = -h;