1. simply activity cluster planner for all-producers-to-all-consumers kind of connectors; 2. lower the NC task threads' priority
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 56200e4..4638118 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -111,6 +111,11 @@
BitSet sourceBitmap);
/**
+ * Indicate whether the connector is an all-producers-to-all-consumers connector
+ */
+ public boolean allProducersToAllConsumers();
+
+ /**
* Gets the display name.
*/
public String getDisplayName();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 4d2ad6b..3863eda 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -190,18 +190,29 @@
ActivityId ac2 = ac.getConsumerActivity(cdId);
Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
int nConsumers = ac2TaskStates.length;
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ if (c.allProducersToAllConsumers()) {
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ for (int j = 0; j < nConsumers; j++) {
TaskId targetTID = ac2TaskStates[j].getTaskId();
cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
}
+ for (int i = 0; i < nProducers; ++i) {
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ } else {
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+ .getTaskId());
+ if (cInfoList == null) {
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ TaskId targetTID = ac2TaskStates[j].getTaskId();
+ cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+ }
+ }
}
}
}
@@ -341,9 +352,15 @@
int nConsumers = ac2TaskStates.length;
int[] fanouts = new int[nProducers];
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- fanouts[i] = targetBitmap.cardinality();
+ if (c.allProducersToAllConsumers()) {
+ for (int i = 0; i < nProducers; ++i) {
+ fanouts[i] = nConsumers;
+ }
+ } else {
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ fanouts[i] = targetBitmap.cardinality();
+ }
}
IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
cPolicyMap.put(cdId, cp);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f12c981..58e12cf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -87,6 +87,7 @@
private class WorkerThread extends Thread {
WorkerThread() {
setDaemon(true);
+ setPriority(MAX_PRIORITY);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6049a3b..b7fcf7f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
heartbeatTask = new HeartbeatTask(ccs);
- // Schedule heartbeat generator.
+ // Schedule heartbeat generator.
timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..fc64814 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -243,6 +243,7 @@
addPendingThread(thread);
String oldName = thread.getName();
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ thread.setPriority(Thread.MIN_PRIORITY);
try {
pushFrames(collector, writer);
} catch (HyracksDataException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 30b2482..df4d296 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -38,4 +38,9 @@
sourceBitmap.clear();
sourceBitmap.set(0, nProducerPartitions);
}
+
+ @Override
+ public boolean allProducersToAllConsumers(){
+ return true;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 466fead..20a0ed1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -82,4 +82,9 @@
sourceBitmap.clear();
sourceBitmap.set(consumerIndex);
}
+
+ @Override
+ public boolean allProducersToAllConsumers() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index f4b26cb..279de61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,9 +61,9 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
- ccConfig.heartbeatPeriod = 5000;
- ccConfig.maxHeartbeatLapsePeriods = 8;
-
+ ccConfig.heartbeatPeriod = 1000;
+ ccConfig.maxHeartbeatLapsePeriods = 10;
+
// cluster controller
cc = new ClusterControllerService(ccConfig);
cc.start();