Make all the necessary changes or additions to report the data transfer info across the new result distribution socket for the node controller.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2520 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index fd9218a..73b5488 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -27,10 +27,14 @@
 
     private final NetworkAddress netAddress;
 
-    public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress) {
+    private final NetworkAddress datasetNetworkAddress;
+
+    public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
+            NetworkAddress datasetNetworkAddress) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
+        this.datasetNetworkAddress = datasetNetworkAddress;
     }
 
     public String getNodeId() {
@@ -44,4 +48,8 @@
     public NetworkAddress getNetworkAddress() {
         return netAddress;
     }
+
+    public NetworkAddress getDatasetNetworkAddress() {
+        return datasetNetworkAddress;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c17acd0..c96a319 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,6 +41,8 @@
 
     private final NetworkAddress dataPort;
 
+    private final NetworkAddress datasetPort;
+
     private final Set<JobId> activeJobIds;
 
     private final String osName;
@@ -107,6 +109,14 @@
 
     private final long[] netSignalingBytesWritten;
 
+    private final long[] datasetNetPayloadBytesRead;
+
+    private final long[] datasetNetPayloadBytesWritten;
+
+    private final long[] datasetNetSignalingBytesRead;
+
+    private final long[] datasetNetSignalingBytesWritten;
+
     private final long[] ipcMessagesSent;
 
     private final long[] ipcMessageBytesSent;
@@ -123,6 +133,7 @@
         this.nodeController = nodeController;
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
+        datasetPort = reg.getDatasetPort();
         activeJobIds = new HashSet<JobId>();
 
         osName = reg.getOSName();
@@ -164,6 +175,10 @@
         netPayloadBytesWritten = new long[RRD_SIZE];
         netSignalingBytesRead = new long[RRD_SIZE];
         netSignalingBytesWritten = new long[RRD_SIZE];
+        datasetNetPayloadBytesRead = new long[RRD_SIZE];
+        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+        datasetNetSignalingBytesRead = new long[RRD_SIZE];
+        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
         ipcMessagesSent = new long[RRD_SIZE];
         ipcMessageBytesSent = new long[RRD_SIZE];
         ipcMessagesReceived = new long[RRD_SIZE];
@@ -196,6 +211,10 @@
         netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
         netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
         netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
         ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
         ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
         ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -227,6 +246,10 @@
         return dataPort;
     }
 
+    public NetworkAddress getDatasetPort() {
+        return datasetPort;
+    }
+
     public JSONObject toSummaryJSON() throws JSONException {
         JSONObject o = new JSONObject();
         o.put("node-id", ncConfig.nodeId);
@@ -271,6 +294,10 @@
         o.put("net-payload-bytes-written", netPayloadBytesWritten);
         o.put("net-signaling-bytes-read", netSignalingBytesRead);
         o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+        o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+        o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
+        o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+        o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
         o.put("ipc-messages-sent", ipcMessagesSent);
         o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
         o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 2f23a2c..a787b9f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,7 +39,8 @@
         Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
+                    .getValue().getDatasetPort()));
         }
         callback.setValue(result);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index ff9d8a0..3fc46ff 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,6 +58,10 @@
         var netPayloadBytesWritten = result['net-payload-bytes-written'];
         var netSignalingBytesRead = result['net-signaling-bytes-read'];
         var netSignalingBytesWritten = result['net-signaling-bytes-written'];
+        var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
+        var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
+        var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
+        var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
         var ipcMessagesSent = result['ipc-messages-sent'];
         var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
         var ipcMessagesReceived = result['ipc-messages-received'];
@@ -117,9 +121,13 @@
             }
             if (i < sysLoad.length - 1) {
                 netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
+                netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
                 netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
+                netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
                 netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
+                netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
                 netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
+                netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
                 ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
                 ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
                 ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 1dba3bc..663c68a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,6 +37,10 @@
     public long netPayloadBytesWritten;
     public long netSignalingBytesRead;
     public long netSignalingBytesWritten;
+    public long datasetNetPayloadBytesRead;
+    public long datasetNetPayloadBytesWritten;
+    public long datasetNetSignalingBytesRead;
+    public long datasetNetSignalingBytesWritten;
     public long ipcMessagesSent;
     public long ipcMessageBytesSent;
     public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3a41e48..2b52182 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -373,6 +373,12 @@
             hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
             hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
 
+            MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
+            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
             IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
             hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
             hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();