Merge branch 'yingyi/fullstack_fix' of https://code.google.com/p/hyracks into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 56200e4..4638118 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -111,6 +111,11 @@
             BitSet sourceBitmap);
 
     /**
+     * Indicate whether the connector is an all-producers-to-all-consumers connector
+     */
+    public boolean allProducersToAllConsumers();
+
+    /**
      * Gets the display name.
      */
     public String getDisplayName();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 4d2ad6b..3863eda 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -190,18 +190,29 @@
                     ActivityId ac2 = ac.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
                     int nConsumers = ac2TaskStates.length;
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                .getTaskId());
-                        if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                    if (c.allProducersToAllConsumers()) {
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                        for (int j = 0; j < nConsumers; j++) {
                             TaskId targetTID = ac2TaskStates[j].getTaskId();
                             cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
                         }
+                        for (int i = 0; i < nProducers; ++i) {
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                        }
+                    } else {
+                        for (int i = 0; i < nProducers; ++i) {
+                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                            List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+                                    .getTaskId());
+                            if (cInfoList == null) {
+                                cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                                taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                            }
+                            for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                                TaskId targetTID = ac2TaskStates[j].getTaskId();
+                                cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+                            }
+                        }
                     }
                 }
             }
@@ -341,9 +352,15 @@
                     int nConsumers = ac2TaskStates.length;
 
                     int[] fanouts = new int[nProducers];
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        fanouts[i] = targetBitmap.cardinality();
+                    if (c.allProducersToAllConsumers()) {
+                        for (int i = 0; i < nProducers; ++i) {
+                            fanouts[i] = nConsumers;
+                        }
+                    } else {
+                        for (int i = 0; i < nProducers; ++i) {
+                            c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                            fanouts[i] = targetBitmap.cardinality();
+                        }
                     }
                     IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
                     cPolicyMap.put(cdId, cp);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f12c981..58e12cf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -87,6 +87,7 @@
     private class WorkerThread extends Thread {
         WorkerThread() {
             setDaemon(true);
+            setPriority(MAX_PRIORITY);
         }
 
         @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6049a3b..b7fcf7f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
 
         heartbeatTask = new HeartbeatTask(ccs);
 
-        // Schedule heartbeat generator.
+        // Schedule heartbeat generator. 
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
         if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..fc64814 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -243,6 +243,7 @@
                                 addPendingThread(thread);
                                 String oldName = thread.getName();
                                 thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
                                 try {
                                     pushFrames(collector, writer);
                                 } catch (HyracksDataException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 30b2482..df4d296 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -38,4 +38,9 @@
         sourceBitmap.clear();
         sourceBitmap.set(0, nProducerPartitions);
     }
+    
+    @Override
+    public boolean allProducersToAllConsumers(){
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 466fead..20a0ed1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -82,4 +82,9 @@
         sourceBitmap.clear();
         sourceBitmap.set(consumerIndex);
     }
+
+    @Override
+    public boolean allProducersToAllConsumers() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index f4b26cb..279de61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,9 +61,9 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
-        ccConfig.heartbeatPeriod = 5000;
-        ccConfig.maxHeartbeatLapsePeriods = 8;
-
+        ccConfig.heartbeatPeriod = 1000;
+        ccConfig.maxHeartbeatLapsePeriods = 10;
+        
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
         cc.start();