Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
index 99ee616..6069d6d 100644
--- a/hyracks/hyracks-client/pom.xml
+++ b/hyracks/hyracks-client/pom.xml
@@ -1,60 +1,114 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-client</artifactId>
- <name>hyracks-client</name>
- <parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </parent>
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-client</artifactId>
+ <name>hyracks-client</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </parent>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-net</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-comm</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.10-SNAPSHOT</version>
- </dependency>
- </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx2047m -XX:MaxPermSize=300m
+ -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ <include>edu.uci.*</include>
+ <include>dev*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
new file mode 100644
index 0000000..450004d
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+public class AggregateCounter extends Counter {
+
+ private long sum = 0;
+ private long numOfItems = 0;
+
+ public AggregateCounter(String name) {
+ super(name);
+ }
+
+ @Override
+ public long set(long value) {
+ long retVal = getRetValue();
+ sum += value;
+ numOfItems++;
+ return retVal;
+ }
+
+ @Override
+ public long get() {
+ long retVal = getRetValue();
+ return retVal;
+ }
+
+ public void reset() {
+ sum = 0;
+ numOfItems = 0;
+ }
+
+ private long getRetValue() {
+ long retVal = 0;
+ if (numOfItems != 0) {
+ retVal = sum / numOfItems;
+ } else {
+ retVal = 0;
+ }
+ return retVal;
+ }
+
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
new file mode 100644
index 0000000..da30e20
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+public class Counters {
+ public static final String NUM_PROCESSOR = "num-processors";
+
+ public static final String SYSTEM_LOAD = "system-load-averages";
+
+ public static final String MEMORY_USAGE = "heap-used-sizes";
+
+ public static final String NETWORK_IO_READ = "net-payload-bytes-read";
+
+ public static final String NETWORK_IO_WRITE = "net-payload-bytes-written";
+
+ public static final String DISK_READ = "disk-reads";
+
+ public static final String DISK_WRITE = "disk-writes";
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java
new file mode 100644
index 0000000..356ef34
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+
+public interface IClusterCounterContext extends ICounterContext {
+
+ public ICounter getCounter(String machineName, String counterName, boolean create);
+
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
new file mode 100644
index 0000000..46487eb
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.stats.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.client.stats.AggregateCounter;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+/**
+ * @author yingyib
+ */
+public class ClientCounterContext implements IClusterCounterContext {
+ private static String[] RESET_COUNTERS = { Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
+ Counters.MEMORY_USAGE, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
+ private static String[] AGG_COUNTERS = { Counters.SYSTEM_LOAD };
+ private static int UPDATE_INTERVAL = 10000;
+
+ private final Map<String, Counter> counterMap = new HashMap<String, Counter>();
+ private final String baseURL;
+ private final List<String> slaveMachines = new ArrayList<String>();
+ private boolean stopped = false;
+ private Thread updateThread = new UpdateThread();
+
+ public ClientCounterContext(String hostName, int restPort, Collection<String> slaveMachines) {
+ this.baseURL = "http://" + hostName + ":" + restPort + "/rest/nodes/";
+ this.slaveMachines.addAll(slaveMachines);
+ for (String restCounterName : RESET_COUNTERS) {
+ counterMap.put(restCounterName, new AggregateCounter(restCounterName));
+ }
+ for (String aggCounterName : AGG_COUNTERS) {
+ counterMap.put(aggCounterName, new AggregateCounter(aggCounterName));
+ }
+ for (String slave : slaveMachines) {
+ for (String restCounterName : RESET_COUNTERS) {
+ counterMap.put(slave + "$" + restCounterName, new AggregateCounter(restCounterName));
+ }
+ for (String aggCounterName : AGG_COUNTERS) {
+ counterMap.put(slave + "$" + aggCounterName, new AggregateCounter(aggCounterName));
+ }
+ }
+ requestCounters();
+ updateThread.start();
+ }
+
+ /**
+ * Reset the counters
+ */
+ public void reset() {
+ for (String aggCounterName : RESET_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+ aggCounter.reset();
+ }
+ for (String slave : slaveMachines) {
+ for (String aggCounterName : RESET_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(slave + "$" + aggCounterName);
+ aggCounter.reset();
+ }
+ }
+ }
+
+ public void resetAll() {
+ reset();
+ for (String aggCounterName : AGG_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+ aggCounter.reset();
+ }
+ for (String slave : slaveMachines) {
+ for (String aggCounterName : AGG_COUNTERS) {
+ AggregateCounter aggCounter = (AggregateCounter) counterMap.get(slave + "$" + aggCounterName);
+ aggCounter.reset();
+ }
+ }
+ }
+
+ @Override
+ public synchronized ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null) {
+ throw new IllegalStateException("request an unknown counter: " + name + "!");
+ }
+ return counter;
+ }
+
+ @Override
+ public ICounter getCounter(String machineName, String counterName, boolean create) {
+ Counter counter = counterMap.get(machineName + "$" + counterName);
+ if (counter == null) {
+ throw new IllegalStateException("request an unknown counter: " + counterName + " on slave machine "
+ + machineName + "!");
+ }
+ return counter;
+ }
+
+ /**
+ * request to each slave machine for all the counters
+ */
+ private synchronized void requestCounters() {
+ try {
+ reset();
+ for (String slave : slaveMachines) {
+ String slaveProfile = requestProfile(slave);
+ JSONParser parser = new JSONParser();
+ JSONObject jo = (JSONObject) parser.parse(slaveProfile);
+ Object counterObject = jo.get("result");
+ if (counterObject instanceof JSONObject) {
+ updateCounterMapWithJSONArray(slave, (JSONObject) counterObject);
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Update counters
+ *
+ * @param jo
+ * the Profile JSON object
+ */
+ private void updateCounterMapWithJSONArray(String slave, JSONObject jo) {
+ for (String counterName : RESET_COUNTERS) {
+ updateCounter(slave, jo, counterName);
+ }
+
+ for (String counterName : AGG_COUNTERS) {
+ updateCounter(slave, jo, counterName);
+ }
+ }
+
+ private void updateCounter(String slave, JSONObject jo, String counterName) {
+ Object counterObject = jo.get(counterName);
+ long counterValue = extractCounterValue(counterObject);
+ // global counter
+ ICounter counter = getCounter(counterName, true);
+ counter.set(counterValue);
+ //local counters
+ ICounter localCounter = getCounter(slave, counterName, true);
+ localCounter.set(counterValue);
+ }
+
+ private long extractCounterValue(Object counterObject) {
+ long counterValue = 0;
+ if (counterObject instanceof JSONArray) {
+ JSONArray jArray = (JSONArray) counterObject;
+ Object[] values = jArray.toArray();
+ for (Object value : values) {
+ if (value instanceof Double) {
+ Double dValue = (Double) value;
+ counterValue += dValue.doubleValue();
+ } else if (value instanceof Long) {
+ Long lValue = (Long) value;
+ counterValue += lValue.longValue();
+ }
+ }
+ counterValue /= values.length;
+ } else {
+ Long val = (Long) counterObject;
+ counterValue = val.longValue();
+ }
+ return counterValue;
+ }
+
+ /**
+ * Request a counter from the slave machine
+ *
+ * @param slaveMachine
+ * @return the JSON string from the slave machine
+ */
+ private String requestProfile(String slaveMachine) {
+ try {
+ String url = baseURL + slaveMachine;
+ URL obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ if (responseCode != 200) {
+ throw new IllegalStateException("The http response code is wrong: " + responseCode);
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+ return response.toString();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Stop the counter profiler
+ */
+ public void stop() {
+ synchronized (updateThread) {
+ stopped = true;
+ updateThread.notifyAll();
+ }
+ try {
+ updateThread.join();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * The thread keep updating counters
+ */
+ private class UpdateThread extends Thread {
+
+ @Override
+ public synchronized void run() {
+ try {
+ while (true) {
+ if (stopped) {
+ break;
+ }
+ requestCounters();
+ wait(UPDATE_INTERVAL);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ notifyAll();
+ }
+ }
+
+ }
+
+}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
new file mode 100644
index 0000000..bbf212f
--- /dev/null
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
+
+public class ClientCounterContextTest {
+
+ @Test
+ public void test() throws Exception {
+ HyracksUtils.init();
+ String[] ncs = new String[] { "nc1", "nc2" };
+ ClientCounterContext ccContext = new ClientCounterContext("localhost", 16001, Arrays.asList(ncs));
+ ccContext.resetAll();
+ synchronized (this) {
+ wait(20000);
+ }
+ String[] counters = { Counters.MEMORY_USAGE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
+ Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ, Counters.DISK_WRITE };
+ for (String counterName : counters) {
+ ICounter counter = ccContext.getCounter(counterName, false);
+ System.out.println(counterName + ": " + counter.get());
+ }
+ for (String slave : ncs) {
+ for (String counterName : counters) {
+ ICounter counter = ccContext.getCounter(slave, counterName, false);
+ System.out.println(slave + " " + counterName + ": " + counter.get());
+ }
+ }
+ ccContext.stop();
+ HyracksUtils.deinit();
+ }
+}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
new file mode 100644
index 0000000..a58aa9c
--- /dev/null
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class HyracksUtils {
+
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+ public static final String CC_HOST = "localhost";
+
+ public static final int FRAME_SIZE = 65536;
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = CC_HOST;
+ ccConfig.clusterNetIpAddress = CC_HOST;
+ ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
+ ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.defaultMaxJobAttempts = 0;
+ ccConfig.jobHistorySize = 0;
+ ccConfig.profileDumpPeriod = -1;
+ ccConfig.heartbeatPeriod = 50;
+
+ // cluster controller
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ // two node controllers
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.clusterNetIPAddress = "localhost";
+ ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.clusterNetIPAddress = "localhost";
+ ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ // hyracks connection
+ hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+ }
+
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+
+ public static void runJob(JobSpecification spec, String appName) throws Exception {
+ spec.setFrameSize(FRAME_SIZE);
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ hcc.waitForCompletion(jobId);
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 85ce104..63fc26b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -125,6 +125,10 @@
private final long[] ipcMessageBytesReceived;
+ private final long[] diskReads;
+
+ private final long[] diskWrites;
+
private int rrdPtr;
private int lastHeartbeatDuration;
@@ -184,6 +188,9 @@
ipcMessagesReceived = new long[RRD_SIZE];
ipcMessageBytesReceived = new long[RRD_SIZE];
+ diskReads = new long[RRD_SIZE];
+ diskWrites = new long[RRD_SIZE];
+
rrdPtr = 0;
}
@@ -219,6 +226,8 @@
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+ diskReads[rrdPtr] = hbData.diskReads;
+ diskWrites[rrdPtr] = hbData.diskWrites;
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
}
}
@@ -303,6 +312,8 @@
o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
o.put("ipc-messages-received", ipcMessagesReceived);
o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+ o.put("disk-reads", diskReads);
+ o.put("disk-writes", diskWrites);
return o;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0724a01..2407c10 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -181,36 +181,44 @@
* @throws HyracksException
*/
private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
- try {
- List<URL> downloadedFileURLs = new ArrayList<URL>();
- File dir = new File(deploymentDir);
- if (!dir.exists()) {
- FileUtils.forceMkdir(dir);
- }
- for (URL url : urls) {
- String urlString = url.toString();
- int slashIndex = urlString.lastIndexOf('/');
- String fileName = urlString.substring(slashIndex + 1).split("&")[1];
- String filePath = deploymentDir + File.separator + fileName;
- File targetFile = new File(filePath);
- if (isNC) {
- HttpClient hc = new DefaultHttpClient();
- HttpGet get = new HttpGet(url.toString());
- HttpResponse response = hc.execute(get);
- InputStream is = response.getEntity().getContent();
- OutputStream os = new FileOutputStream(targetFile);
- try {
- IOUtils.copyLarge(is, os);
- } finally {
- os.close();
- is.close();
- }
+ //retry 10 times at maximum for downloading binaries
+ int retryCount = 10;
+ int tried = 0;
+ Exception trace = null;
+ while (tried < retryCount) {
+ try {
+ tried++;
+ List<URL> downloadedFileURLs = new ArrayList<URL>();
+ File dir = new File(deploymentDir);
+ if (!dir.exists()) {
+ FileUtils.forceMkdir(dir);
}
- downloadedFileURLs.add(targetFile.toURI().toURL());
+ for (URL url : urls) {
+ String urlString = url.toString();
+ int slashIndex = urlString.lastIndexOf('/');
+ String fileName = urlString.substring(slashIndex + 1).split("&")[1];
+ String filePath = deploymentDir + File.separator + fileName;
+ File targetFile = new File(filePath);
+ if (isNC) {
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet(url.toString());
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = new FileOutputStream(targetFile);
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ downloadedFileURLs.add(targetFile.toURI().toURL());
+ }
+ return downloadedFileURLs;
+ } catch (Exception e) {
+ trace = e;
}
- return downloadedFileURLs;
- } catch (Exception e) {
- throw new HyracksException(e);
}
+ throw new HyracksException(trace);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index e46449b..e999913 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -45,4 +45,6 @@
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
public long ipcMessageBytesReceived;
+ public long diskReads;
+ public long diskWrites;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 245d988..1cee330 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -66,6 +66,8 @@
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.profiling.IIOCounter;
+import edu.uci.ics.hyracks.control.nc.io.profiling.IOCounterFactory;
import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
@@ -145,6 +147,8 @@
private final MemoryManager memoryManager;
private boolean shuttedDown = false;
+
+ private IIOCounter ioCounter;
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
@@ -173,6 +177,7 @@
registrationPending = true;
getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+ ioCounter = new IOCounterFactory().getIOCounter();
}
public IHyracksRootContext getRootContext() {
@@ -429,6 +434,9 @@
hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+ hbData.diskReads = ioCounter.getReads();
+ hbData.diskWrites = ioCounter.getWrites();
+
try {
cc.nodeHeartbeat(id, hbData);
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java
new file mode 100644
index 0000000..6774262
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public interface IIOCounter {
+
+ /**
+ * @return the number of block reads from the very beginning
+ */
+ public long getReads();
+
+ /**
+ * @return the number of block writes from the very beginning
+ */
+ public long getWrites();
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java
new file mode 100644
index 0000000..f53f79c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterDefault implements IIOCounter{
+
+ @Override
+ public long getReads() {
+ return 0;
+ }
+
+ @Override
+ public long getWrites() {
+ return 0;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java
new file mode 100644
index 0000000..06ccfaf
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterFactory {
+
+ /**
+ * Get the IOCounter for the specific underlying OS
+ *
+ * @return an IIOCounter instance
+ */
+ public IIOCounter getIOCounter() {
+ String osName = System.getProperty("os.name").toLowerCase();
+ if (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") >= 0) {
+ return new IOCounterLinux();
+ } else if (osName.indexOf("mac") >= 0) {
+ return new IOCounterOSX();
+ } else {
+ return new IOCounterDefault();
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
new file mode 100644
index 0000000..1e8baa1
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+public class IOCounterLinux implements IIOCounter {
+ public static final String COMMAND = "iostat";
+ public static final String COMMAND2 = "cat /proc/self/io";
+ public static final int PAGE_SIZE = 4096;
+
+ private final long baseReads;
+ private final long baseWrites;
+
+ public IOCounterLinux() {
+ baseReads = getReads();
+ baseWrites = getWrites();
+ }
+
+ @Override
+ public long getReads() {
+ try {
+ long reads = extractColumn(4);
+ return reads - baseReads;
+ } catch (IOException e) {
+ try {
+ long reads = extractRow(4);
+ return reads / PAGE_SIZE;
+ } catch (IOException e2) {
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public long getWrites() {
+ try {
+ long writes = extractColumn(5);
+ return writes - baseWrites;
+ } catch (IOException e) {
+ try {
+ long writes = extractRow(5);
+ long cancelledWrites = extractRow(6);
+ return (writes - cancelledWrites) / PAGE_SIZE;
+ } catch (IOException e2) {
+ return 0;
+ }
+ }
+ }
+
+ private long extractColumn(int columnIndex) throws IOException {
+ BufferedReader reader = exec(COMMAND);
+ String line = null;
+ boolean device = false;
+ long ios = 0;
+ while ((line = reader.readLine()) != null) {
+ if (line.contains("Blk_read")) {
+ device = true;
+ continue;
+ }
+ if (device == true) {
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ int i = 0;
+ while (tokenizer.hasMoreTokens()) {
+ String column = tokenizer.nextToken();
+ if (i == columnIndex) {
+ ios += Long.parseLong(column);
+ break;
+ }
+ i++;
+ }
+ }
+ }
+ reader.close();
+ return ios;
+ }
+
+ private long extractRow(int rowIndex) throws IOException {
+ BufferedReader reader = exec(COMMAND2);
+ String line = null;
+ long ios = 0;
+ int i = 0;
+ while ((line = reader.readLine()) != null) {
+ if (i == rowIndex) {
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ int j = 0;
+ while (tokenizer.hasMoreTokens()) {
+ String column = tokenizer.nextToken();
+ if (j == 1) {
+ ios = Long.parseLong(column);
+ break;
+ }
+ j++;
+ }
+ }
+ i++;
+ }
+ reader.close();
+ return ios;
+ }
+
+ private BufferedReader exec(String command) throws IOException {
+ Process p = Runtime.getRuntime().exec(command);
+ return new BufferedReader(new InputStreamReader(p.getInputStream()));
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java
new file mode 100644
index 0000000..5f56ca8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterOSX implements IIOCounter {
+
+ @Override
+ public long getReads() {
+ return 0;
+ }
+
+ @Override
+ public long getWrites() {
+ return 0;
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 075de03..06e79de 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -80,6 +80,8 @@
public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
/** the checkpoint interval */
public static final String CKP_INTERVAL = "pregelix.ckpinterval";
+ /** the dynamic optimization */
+ public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
/** comma */
public static final String COMMA_STR = ",";
@@ -260,6 +262,15 @@
final public void setCheckpointingInterval(int ckpInterval) {
getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
}
+
+ /**
+ * Indicate if dynamic optimization is enabled
+ *
+ * @param dynamicOpt
+ */
+ final public void setEnableDynamicOptimization(boolean dynamicOpt){
+ getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
+ }
@Override
public String toString() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index a5d9cd7..f44942f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -671,6 +671,16 @@
public static int getRecoveryCount(Configuration conf) {
return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
}
+
+ /***
+ * Get enable dynamic optimization
+ *
+ * @param conf Configuration
+ * @return true if enabled; otherwise false
+ */
+ public static boolean getEnableDynamicOptimization(Configuration conf){
+ return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+ }
/***
* Get the user-set checkpoint interval
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 3d1699f..9ae263c 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -358,5 +358,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..b3a90e9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -21,6 +21,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@@ -42,21 +43,24 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
+ public static final String[] COUNTERS = { Counters.NUM_PROCESSOR, Counters.SYSTEM_LOAD, Counters.MEMORY_USAGE,
+ Counters.DISK_READ, Counters.DISK_WRITE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE };
private static final Log LOG = LogFactory.getLog(Driver.class);
private IHyracksClientConnection hcc;
private Class exampleClass;
@@ -93,6 +97,8 @@
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
addHadoopConfiguration(currentJob, ipAddress, port, true);
+ ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+ Arrays.asList(ClusterConfig.getNCNames()));
JobGen jobGen = null;
/** prepare job -- deploy jars */
@@ -105,6 +111,7 @@
int retryCount = 0;
int maxRetryCount = 3;
jobGen = selectJobGen(planChoice, currentJob);
+ IOptimizer dynamicOptimzier = new DynamicOptimizer();
do {
try {
@@ -131,7 +138,10 @@
jobGen.reset(currentJob);
}
- /** run loop-body jobs */
+ /** run loop-body jobs with dynamic optimizer if it is enabled */
+ if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
+ jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
+ }
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
ckpHook, failed);
runClearState(deploymentId, jobGen);
@@ -156,6 +166,13 @@
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
+ StringBuffer counterBuffer = new StringBuffer();
+ counterBuffer.append("performance counters\n");
+ for (String counter : COUNTERS) {
+ counterBuffer.append("\t" + counter + ": " + counterContext.getCounter(counter, false).get() + "\n");
+ }
+ LOG.info(counterBuffer.toString());
+ counterContext.stop();
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -180,24 +197,7 @@
}
private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
- JobGen jobGen;
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(currentJob);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(currentJob);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(currentJob);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(currentJob);
- break;
- default:
- jobGen = new JobGenInnerJoin(currentJob);
- }
- return jobGen;
+ return JobGenFactory.createJobGen(planChoice, currentJob);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..c1f6aae 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
import java.util.UUID;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.base.IJobGen;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
protected PregelixJob pregelixJob;
protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = UUID.randomUUID().toString();
+ protected String jobId = UUID.randomUUID().toString();;
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
@@ -147,6 +153,13 @@
public JobGen(PregelixJob job) {
init(job);
}
+
+ public JobGen(PregelixJob job, String jobId) {
+ if(jobId!=null){
+ this.jobId = jobId;
+ }
+ init(job);
+ }
private void init(PregelixJob job) {
conf = job.getConfiguration();
@@ -779,4 +792,88 @@
/** generate clean-up job */
public abstract JobSpecification[] generateCleanup() throws HyracksException;
+ /**
+ * Switch the plan to a desired one
+ *
+ * @param iteration
+ * , the latest completed iteration number
+ * @param plan
+ * , plan choice
+ * @return the list of jobspecification for preparing plan switch and the new jobgen
+ */
+ public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+ /**
+ * bulk-load a live vertex btree
+ */
+ List<JobSpecification> list = new ArrayList<JobSpecification>();
+ list.add(bulkLoadLiveVertexBTree(iteration));
+ JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+ return Pair.of(list, jobGen);
+ }
+
+ /**
+ * Build a jobspec to bulkload the live vertex btree
+ *
+ * @param iteration
+ * @return the job specification
+ * @throws HyracksException
+ */
+ private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search and function call update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+ MsgList.class.getName());
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+ getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+ preHookFactory, null, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct bulk-load index operator
+ */
+ IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ int[] fieldPermutation = new int[] { 0, 1 };
+ int[] keyFields = new int[] { 0 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+ WritableComparator.get(vertexIdClass).getClass());
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /** connect job spec */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+ spec.addRoot(btreeBulkLoad);
+
+ return spec;
+ }
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+ public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+ JobGen jobGen = null;
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(currentJob);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(currentJob);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(currentJob);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(currentJob);
+ }
+ return jobGen;
+ }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 1bad401..7bdb069 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -96,6 +96,10 @@
public JobGenInnerJoin(PregelixJob job) {
super(job);
}
+
+ public JobGenInnerJoin(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index d01c069..68e6706 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -72,6 +72,10 @@
public JobGenOuterJoin(PregelixJob job) {
super(job);
}
+
+ public JobGenOuterJoin(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 4480b97..3e4b213 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -72,6 +72,10 @@
public JobGenOuterJoinSingleSort(PregelixJob job) {
super(job);
}
+
+ public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 89fbdcd..5c1a4b8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -256,6 +256,10 @@
return locations;
}
+ public static String[] getNCNames() {
+ return NCs;
+ }
+
public static void addToBlackListNodes(Collection<String> nodes) {
blackListNodes.addAll(nodes);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
new file mode 100644
index 0000000..01fc81b
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public class DynamicOptimizer implements IOptimizer {
+
+ @Override
+ public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
+ return jobGen;
+ }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
new file mode 100644
index 0000000..b5913c4
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public interface IOptimizer {
+
+ public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+
+ private MsgList dummyMessageList = new MsgList();
+ private Vertex vertex;
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.writerAlive = writers[0];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ try {
+ // vertex Id, vertex
+ alive.reset();
+ vertex = (Vertex) tuple[1];
+ if (!vertex.isHalted()) {
+ alive.reset();
+ DataOutput outputAlive = alive.getDataOutput();
+ vertex.getVertexId().write(outputAlive);
+ alive.addFieldEndOffset();
+ dummyMessageList.write(outputAlive);
+ alive.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
+
+ }
+ };
+ }
+}