let system_load be average of a specified period
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
new file mode 100644
index 0000000..450004d
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+public class AggregateCounter extends Counter {
+
+ private long sum = 0;
+ private long numOfItems = 0;
+
+ public AggregateCounter(String name) {
+ super(name);
+ }
+
+ @Override
+ public long set(long value) {
+ long retVal = getRetValue();
+ sum += value;
+ numOfItems++;
+ return retVal;
+ }
+
+ @Override
+ public long get() {
+ long retVal = getRetValue();
+ return retVal;
+ }
+
+ public void reset() {
+ sum = 0;
+ numOfItems = 0;
+ }
+
+ private long getRetValue() {
+ long retVal = 0;
+ if (numOfItems != 0) {
+ retVal = sum / numOfItems;
+ } else {
+ retVal = 0;
+ }
+ return retVal;
+ }
+
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCollector.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCollector.java
deleted file mode 100644
index 85d0ca1..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCollector.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.client.stats;
-
-import java.util.Collection;
-
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
-
-public class StatsCollector {
-
- public StatsCollector(String hostName, int port, ICounterContext counterContext) {
-
- }
-
- public Collection<ICounter> getStats() {
- return null;
- }
-
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCounter.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCounter.java
deleted file mode 100644
index 88b6439..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/StatsCounter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.client.stats;
-
-import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
-
-public class StatsCounter extends Counter {
-
- public StatsCounter(String name) {
- super(name);
- }
-
- public void aggregate(StatsCounter slaveCounter) {
-
- }
-
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index 79dec7a..532cd1e 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.client.stats.AggregateCounter;
import edu.uci.ics.hyracks.client.stats.Counters;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
@@ -37,32 +38,65 @@
* @author yingyib
*/
public class ClientCounterContext implements ICounterContext {
- private String[] counters = { Counters.SYSTEM_LOAD, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
+ private static String[] RESET_COUNTERS = { Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
Counters.MEMORY_USAGE };
- private Map<String, Counter> counterMap = new HashMap<String, Counter>();
+ private static String[] AGG_COUNTERS = { Counters.SYSTEM_LOAD };
+ private static int UPDATE_INTERVAL = 10000;
+
+ private final Map<String, Counter> counterMap = new HashMap<String, Counter>();
private final String baseURL;
private final List<String> slaveMachines = new ArrayList<String>();
+ private boolean stopped = false;
+ private Thread updateThread = new UpdateThread();
public ClientCounterContext(String hostName, int restPort, Collection<String> slaveMachines) {
this.baseURL = "http://" + hostName + ":" + restPort + "/rest/nodes/";
this.slaveMachines.addAll(slaveMachines);
+ for (String restCounterName : RESET_COUNTERS) {
+ counterMap.put(restCounterName, new AggregateCounter(restCounterName));
+ }
+ for (String aggCounterName : AGG_COUNTERS) {
+ counterMap.put(aggCounterName, new AggregateCounter(aggCounterName));
+ }
requestCounters();
+ updateThread.start();
+ }
+
+ /**
+ * Reset the counters
+ */
+ public void reset() {
+ for (String aggCounterName : RESET_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+ aggCounter.reset();
+ }
+ }
+
+ public void resetAll() {
+ reset();
+ for (String aggCounterName : AGG_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+ aggCounter.reset();
+ }
}
@Override
- public ICounter getCounter(String name, boolean create) {
+ public synchronized ICounter getCounter(String name, boolean create) {
Counter counter = counterMap.get(name);
- if (counter == null && create) {
- counter = new Counter(name);
- counterMap.put(name, counter);
+ if (counter == null) {
+ throw new IllegalStateException("request an unknown counter: " + name + "!");
}
return counter;
}
- private void requestCounters() {
+ /**
+ * request to each slave machine for all the counters
+ */
+ private synchronized void requestCounters() {
try {
+ reset();
for (String slave : slaveMachines) {
- String slaveProfile = sendGet(slave);
+ String slaveProfile = requestProfile(slave);
JSONParser parser = new JSONParser();
JSONObject jo = (JSONObject) parser.parse(slaveProfile);
updateCounterMap((JSONObject) jo.get("result"));
@@ -72,27 +106,39 @@
}
}
+ /**
+ * Update counters
+ *
+ * @param jo
+ * the Profile JSON object
+ */
private void updateCounterMap(JSONObject jo) {
- for (String counterName : counters) {
+ for (String counterName : RESET_COUNTERS) {
JSONArray jArray = (JSONArray) jo.get(counterName);
Object[] values = jArray.toArray();
long counterValue = 0;
for (Object value : values) {
if (value instanceof Double) {
Double dValue = (Double) value;
- counterValue += dValue.doubleValue() * 100;
+ counterValue += dValue.doubleValue();
} else if (value instanceof Long) {
Long lValue = (Long) value;
counterValue += lValue.longValue();
}
}
- counterValue /= slaveMachines.size();
+ counterValue /= values.length;
ICounter counter = getCounter(counterName, true);
counter.set(counterValue);
}
}
- private String sendGet(String slaveMachine) {
+ /**
+ * Request a counter from the slave machine
+ *
+ * @param slaveMachine
+ * @return the JSON string from the slave machine
+ */
+ private String requestProfile(String slaveMachine) {
try {
String url = baseURL + slaveMachine;
URL obj = new URL(url);
@@ -115,4 +161,43 @@
}
}
+ /**
+ * Stop the counter profiler
+ */
+ public void stop() {
+ synchronized (updateThread) {
+ stopped = true;
+ updateThread.notifyAll();
+ }
+ try {
+ updateThread.join();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * The thread keep updating counters
+ */
+ private class UpdateThread extends Thread {
+
+ @Override
+ public synchronized void run() {
+ try {
+ while (true) {
+ if (stopped) {
+ break;
+ }
+ requestCounters();
+ wait(UPDATE_INTERVAL);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ notifyAll();
+ }
+ }
+
+ }
+
}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
index 9dd1e39..5fe77f2 100644
--- a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
@@ -29,12 +29,17 @@
HyracksUtils.init();
String[] ncs = new String[] { "nc1", "nc2" };
ClientCounterContext ccContext = new ClientCounterContext("localhost", 16001, Arrays.asList(ncs));
+ ccContext.resetAll();
+ synchronized (this) {
+ wait(20000);
+ }
String[] counters = { Counters.MEMORY_USAGE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
Counters.SYSTEM_LOAD };
for (String counterName : counters) {
ICounter counter = ccContext.getCounter(counterName, false);
System.out.println(counter.get());
}
+ ccContext.stop();
HyracksUtils.deinit();
}
}