Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 56200e4..4638118 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -111,6 +111,11 @@
BitSet sourceBitmap);
/**
+ * Indicate whether the connector is an all-producers-to-all-consumers connector
+ */
+ public boolean allProducersToAllConsumers();
+
+ /**
* Gets the display name.
*/
public String getDisplayName();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
index 6390abf..aab59c8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -17,6 +17,8 @@
public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
+ private String nodeId;
+
public HyracksDataException() {
}
@@ -24,11 +26,19 @@
super(message);
}
+ public HyracksDataException(Throwable cause) {
+ super(cause);
+ }
+
public HyracksDataException(String message, Throwable cause) {
super(message, cause);
}
- public HyracksDataException(Throwable cause) {
- super(cause);
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 67ba2b6..3ee036d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -33,276 +33,281 @@
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
public class NodeControllerState {
- private static final int RRD_SIZE = 720;
+ private static final int RRD_SIZE = 720;
- private final INodeController nodeController;
+ private final INodeController nodeController;
- private final NCConfig ncConfig;
+ private final NCConfig ncConfig;
- private final NetworkAddress dataPort;
+ private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
+ private final NetworkAddress datasetPort;
- private final Set<JobId> activeJobIds;
+ private final Set<JobId> activeJobIds;
- private final String osName;
+ private final String osName;
- private final String arch;
+ private final String arch;
- private final String osVersion;
+ private final String osVersion;
- private final int nProcessors;
+ private final int nProcessors;
- private final String vmName;
+ private final String vmName;
- private final String vmVersion;
+ private final String vmVersion;
- private final String vmVendor;
+ private final String vmVendor;
- private final String classpath;
+ private final String classpath;
- private final String libraryPath;
+ private final String libraryPath;
- private final String bootClasspath;
+ private final String bootClasspath;
- private final List<String> inputArguments;
+ private final List<String> inputArguments;
- private final Map<String, String> systemProperties;
+ private final Map<String, String> systemProperties;
- private final HeartbeatSchema hbSchema;
+ private final HeartbeatSchema hbSchema;
- private final long[] hbTime;
+ private final long[] hbTime;
- private final long[] heapInitSize;
+ private final long[] heapInitSize;
- private final long[] heapUsedSize;
+ private final long[] heapUsedSize;
- private final long[] heapCommittedSize;
+ private final long[] heapCommittedSize;
- private final long[] heapMaxSize;
+ private final long[] heapMaxSize;
- private final long[] nonheapInitSize;
+ private final long[] nonheapInitSize;
- private final long[] nonheapUsedSize;
+ private final long[] nonheapUsedSize;
- private final long[] nonheapCommittedSize;
+ private final long[] nonheapCommittedSize;
- private final long[] nonheapMaxSize;
+ private final long[] nonheapMaxSize;
- private final int[] threadCount;
+ private final int[] threadCount;
- private final int[] peakThreadCount;
+ private final int[] peakThreadCount;
- private final double[] systemLoadAverage;
+ private final double[] systemLoadAverage;
- private final String[] gcNames;
+ private final String[] gcNames;
- private final long[][] gcCollectionCounts;
+ private final long[][] gcCollectionCounts;
- private final long[][] gcCollectionTimes;
+ private final long[][] gcCollectionTimes;
- private final long[] netPayloadBytesRead;
+ private final long[] netPayloadBytesRead;
- private final long[] netPayloadBytesWritten;
+ private final long[] netPayloadBytesWritten;
- private final long[] netSignalingBytesRead;
+ private final long[] netSignalingBytesRead;
- private final long[] netSignalingBytesWritten;
+ private final long[] netSignalingBytesWritten;
- private final long[] datasetNetPayloadBytesRead;
+ private final long[] datasetNetPayloadBytesRead;
- private final long[] datasetNetPayloadBytesWritten;
+ private final long[] datasetNetPayloadBytesWritten;
- private final long[] datasetNetSignalingBytesRead;
+ private final long[] datasetNetSignalingBytesRead;
- private final long[] datasetNetSignalingBytesWritten;
+ private final long[] datasetNetSignalingBytesWritten;
- private final long[] ipcMessagesSent;
+ private final long[] ipcMessagesSent;
- private final long[] ipcMessageBytesSent;
+ private final long[] ipcMessageBytesSent;
- private final long[] ipcMessagesReceived;
+ private final long[] ipcMessagesReceived;
- private final long[] ipcMessageBytesReceived;
+ private final long[] ipcMessageBytesReceived;
- private int rrdPtr;
+ private int rrdPtr;
- private int lastHeartbeatDuration;
+ private int lastHeartbeatDuration;
- public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
- this.nodeController = nodeController;
- ncConfig = reg.getNCConfig();
- dataPort = reg.getDataPort();
- datasetPort = reg.getDatasetPort();
- activeJobIds = new HashSet<JobId>();
+ public NodeControllerState(INodeController nodeController,
+ NodeRegistration reg) {
+ this.nodeController = nodeController;
+ ncConfig = reg.getNCConfig();
+ dataPort = reg.getDataPort();
+ datasetPort = reg.getDatasetPort();
+ activeJobIds = new HashSet<JobId>();
- osName = reg.getOSName();
- arch = reg.getArch();
- osVersion = reg.getOSVersion();
- nProcessors = reg.getNProcessors();
- vmName = reg.getVmName();
- vmVersion = reg.getVmVersion();
- vmVendor = reg.getVmVendor();
- classpath = reg.getClasspath();
- libraryPath = reg.getLibraryPath();
- bootClasspath = reg.getBootClasspath();
- inputArguments = reg.getInputArguments();
- systemProperties = reg.getSystemProperties();
+ osName = reg.getOSName();
+ arch = reg.getArch();
+ osVersion = reg.getOSVersion();
+ nProcessors = reg.getNProcessors();
+ vmName = reg.getVmName();
+ vmVersion = reg.getVmVersion();
+ vmVendor = reg.getVmVendor();
+ classpath = reg.getClasspath();
+ libraryPath = reg.getLibraryPath();
+ bootClasspath = reg.getBootClasspath();
+ inputArguments = reg.getInputArguments();
+ systemProperties = reg.getSystemProperties();
- hbSchema = reg.getHeartbeatSchema();
+ hbSchema = reg.getHeartbeatSchema();
- hbTime = new long[RRD_SIZE];
- heapInitSize = new long[RRD_SIZE];
- heapUsedSize = new long[RRD_SIZE];
- heapCommittedSize = new long[RRD_SIZE];
- heapMaxSize = new long[RRD_SIZE];
- nonheapInitSize = new long[RRD_SIZE];
- nonheapUsedSize = new long[RRD_SIZE];
- nonheapCommittedSize = new long[RRD_SIZE];
- nonheapMaxSize = new long[RRD_SIZE];
- threadCount = new int[RRD_SIZE];
- peakThreadCount = new int[RRD_SIZE];
- systemLoadAverage = new double[RRD_SIZE];
- GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
- int gcN = gcInfos.length;
- gcNames = new String[gcN];
- for (int i = 0; i < gcN; ++i) {
- gcNames[i] = gcInfos[i].getName();
- }
- gcCollectionCounts = new long[gcN][RRD_SIZE];
- gcCollectionTimes = new long[gcN][RRD_SIZE];
- netPayloadBytesRead = new long[RRD_SIZE];
- netPayloadBytesWritten = new long[RRD_SIZE];
- netSignalingBytesRead = new long[RRD_SIZE];
- netSignalingBytesWritten = new long[RRD_SIZE];
- datasetNetPayloadBytesRead = new long[RRD_SIZE];
- datasetNetPayloadBytesWritten = new long[RRD_SIZE];
- datasetNetSignalingBytesRead = new long[RRD_SIZE];
- datasetNetSignalingBytesWritten = new long[RRD_SIZE];
- ipcMessagesSent = new long[RRD_SIZE];
- ipcMessageBytesSent = new long[RRD_SIZE];
- ipcMessagesReceived = new long[RRD_SIZE];
- ipcMessageBytesReceived = new long[RRD_SIZE];
+ hbTime = new long[RRD_SIZE];
+ heapInitSize = new long[RRD_SIZE];
+ heapUsedSize = new long[RRD_SIZE];
+ heapCommittedSize = new long[RRD_SIZE];
+ heapMaxSize = new long[RRD_SIZE];
+ nonheapInitSize = new long[RRD_SIZE];
+ nonheapUsedSize = new long[RRD_SIZE];
+ nonheapCommittedSize = new long[RRD_SIZE];
+ nonheapMaxSize = new long[RRD_SIZE];
+ threadCount = new int[RRD_SIZE];
+ peakThreadCount = new int[RRD_SIZE];
+ systemLoadAverage = new double[RRD_SIZE];
+ GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+ int gcN = gcInfos.length;
+ gcNames = new String[gcN];
+ for (int i = 0; i < gcN; ++i) {
+ gcNames[i] = gcInfos[i].getName();
+ }
+ gcCollectionCounts = new long[gcN][RRD_SIZE];
+ gcCollectionTimes = new long[gcN][RRD_SIZE];
+ netPayloadBytesRead = new long[RRD_SIZE];
+ netPayloadBytesWritten = new long[RRD_SIZE];
+ netSignalingBytesRead = new long[RRD_SIZE];
+ netSignalingBytesWritten = new long[RRD_SIZE];
+ datasetNetPayloadBytesRead = new long[RRD_SIZE];
+ datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+ datasetNetSignalingBytesRead = new long[RRD_SIZE];
+ datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+ ipcMessagesSent = new long[RRD_SIZE];
+ ipcMessageBytesSent = new long[RRD_SIZE];
+ ipcMessagesReceived = new long[RRD_SIZE];
+ ipcMessageBytesReceived = new long[RRD_SIZE];
- rrdPtr = 0;
- }
+ rrdPtr = 0;
+ }
- public void notifyHeartbeat(HeartbeatData hbData) {
- lastHeartbeatDuration = 0;
-
- hbTime[rrdPtr] = System.currentTimeMillis();
- heapInitSize[rrdPtr] = hbData.heapInitSize;
- heapUsedSize[rrdPtr] = hbData.heapUsedSize;
- heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
- heapMaxSize[rrdPtr] = hbData.heapMaxSize;
- nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
- nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
- nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
- nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
- threadCount[rrdPtr] = hbData.threadCount;
- peakThreadCount[rrdPtr] = hbData.peakThreadCount;
- systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
- int gcN = hbSchema.getGarbageCollectorInfos().length;
- for (int i = 0; i < gcN; ++i) {
- gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
- gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
- }
- netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
- netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
- netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
- netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
- datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
- datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
- datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
- datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
- ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
- ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
- ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
- ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
- rrdPtr = (rrdPtr + 1) % RRD_SIZE;
- }
+ public void notifyHeartbeat(HeartbeatData hbData) {
+ lastHeartbeatDuration = 0;
+ hbTime[rrdPtr] = System.currentTimeMillis();
+ if (hbData != null) {
+ heapInitSize[rrdPtr] = hbData.heapInitSize;
+ heapUsedSize[rrdPtr] = hbData.heapUsedSize;
+ heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
+ heapMaxSize[rrdPtr] = hbData.heapMaxSize;
+ nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
+ nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
+ nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
+ nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
+ threadCount[rrdPtr] = hbData.threadCount;
+ peakThreadCount[rrdPtr] = hbData.peakThreadCount;
+ systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
+ int gcN = hbSchema.getGarbageCollectorInfos().length;
+ for (int i = 0; i < gcN; ++i) {
+ gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
+ gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
+ }
+ netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+ netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+ netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+ netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+ datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+ datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+ datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+ datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+ ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+ ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+ ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+ ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+ }
+ rrdPtr = (rrdPtr + 1) % RRD_SIZE;
+ }
- public int incrementLastHeartbeatDuration() {
- return lastHeartbeatDuration++;
- }
+ public int incrementLastHeartbeatDuration() {
+ return lastHeartbeatDuration++;
+ }
- public int getLastHeartbeatDuration() {
- return lastHeartbeatDuration;
- }
+ public int getLastHeartbeatDuration() {
+ return lastHeartbeatDuration;
+ }
- public INodeController getNodeController() {
- return nodeController;
- }
+ public INodeController getNodeController() {
+ return nodeController;
+ }
- public NCConfig getNCConfig() {
- return ncConfig;
- }
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
- public Set<JobId> getActiveJobIds() {
- return activeJobIds;
- }
+ public Set<JobId> getActiveJobIds() {
+ return activeJobIds;
+ }
- public NetworkAddress getDataPort() {
- return dataPort;
- }
+ public NetworkAddress getDataPort() {
+ return dataPort;
+ }
- public NetworkAddress getDatasetPort() {
- return datasetPort;
- }
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
- public JSONObject toSummaryJSON() throws JSONException {
- JSONObject o = new JSONObject();
- o.put("node-id", ncConfig.nodeId);
- o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
- o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+ public JSONObject toSummaryJSON() throws JSONException {
+ JSONObject o = new JSONObject();
+ o.put("node-id", ncConfig.nodeId);
+ o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+ o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1)
+ % RRD_SIZE]);
- return o;
- }
+ return o;
+ }
- public JSONObject toDetailedJSON() throws JSONException {
- JSONObject o = new JSONObject();
+ public JSONObject toDetailedJSON() throws JSONException {
+ JSONObject o = new JSONObject();
- o.put("node-id", ncConfig.nodeId);
- o.put("os-name", osName);
- o.put("arch", arch);
- o.put("os-version", osVersion);
- o.put("num-processors", nProcessors);
- o.put("vm-name", vmName);
- o.put("vm-version", vmVersion);
- o.put("vm-vendor", vmVendor);
- o.put("classpath", classpath);
- o.put("library-path", libraryPath);
- o.put("boot-classpath", bootClasspath);
- o.put("input-arguments", new JSONArray(inputArguments));
- o.put("rrd-ptr", rrdPtr);
- o.put("heartbeat-times", hbTime);
- o.put("heap-init-sizes", heapInitSize);
- o.put("heap-used-sizes", heapUsedSize);
- o.put("heap-committed-sizes", heapCommittedSize);
- o.put("heap-max-sizes", heapMaxSize);
- o.put("nonheap-init-sizes", nonheapInitSize);
- o.put("nonheap-used-sizes", nonheapUsedSize);
- o.put("nonheap-committed-sizes", nonheapCommittedSize);
- o.put("nonheap-max-sizes", nonheapMaxSize);
- o.put("thread-counts", threadCount);
- o.put("peak-thread-counts", peakThreadCount);
- o.put("system-load-averages", systemLoadAverage);
- o.put("gc-names", gcNames);
- o.put("gc-collection-counts", gcCollectionCounts);
- o.put("gc-collection-times", gcCollectionTimes);
- o.put("net-payload-bytes-read", netPayloadBytesRead);
- o.put("net-payload-bytes-written", netPayloadBytesWritten);
- o.put("net-signaling-bytes-read", netSignalingBytesRead);
- o.put("net-signaling-bytes-written", netSignalingBytesWritten);
- o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
- o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
- o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
- o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
- o.put("ipc-messages-sent", ipcMessagesSent);
- o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
- o.put("ipc-messages-received", ipcMessagesReceived);
- o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+ o.put("node-id", ncConfig.nodeId);
+ o.put("os-name", osName);
+ o.put("arch", arch);
+ o.put("os-version", osVersion);
+ o.put("num-processors", nProcessors);
+ o.put("vm-name", vmName);
+ o.put("vm-version", vmVersion);
+ o.put("vm-vendor", vmVendor);
+ o.put("classpath", classpath);
+ o.put("library-path", libraryPath);
+ o.put("boot-classpath", bootClasspath);
+ o.put("input-arguments", new JSONArray(inputArguments));
+ o.put("rrd-ptr", rrdPtr);
+ o.put("heartbeat-times", hbTime);
+ o.put("heap-init-sizes", heapInitSize);
+ o.put("heap-used-sizes", heapUsedSize);
+ o.put("heap-committed-sizes", heapCommittedSize);
+ o.put("heap-max-sizes", heapMaxSize);
+ o.put("nonheap-init-sizes", nonheapInitSize);
+ o.put("nonheap-used-sizes", nonheapUsedSize);
+ o.put("nonheap-committed-sizes", nonheapCommittedSize);
+ o.put("nonheap-max-sizes", nonheapMaxSize);
+ o.put("thread-counts", threadCount);
+ o.put("peak-thread-counts", peakThreadCount);
+ o.put("system-load-averages", systemLoadAverage);
+ o.put("gc-names", gcNames);
+ o.put("gc-collection-counts", gcCollectionCounts);
+ o.put("gc-collection-times", gcCollectionTimes);
+ o.put("net-payload-bytes-read", netPayloadBytesRead);
+ o.put("net-payload-bytes-written", netPayloadBytesWritten);
+ o.put("net-signaling-bytes-read", netSignalingBytesRead);
+ o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+ o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+ o.put("dataset-net-payload-bytes-written",
+ datasetNetPayloadBytesWritten);
+ o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+ o.put("dataset-net-signaling-bytes-written",
+ datasetNetSignalingBytesWritten);
+ o.put("ipc-messages-sent", ipcMessagesSent);
+ o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+ o.put("ipc-messages-received", ipcMessagesReceived);
+ o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
- return o;
- }
+ return o;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 2166620..bae0eb5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
public class JobRun implements IJobStatusConditionVariable {
private final DeploymentId deploymentId;
@@ -347,7 +348,7 @@
taskAttempt.put("end-time", ta.getEndTime());
List<Exception> exceptions = ta.getExceptions();
if (exceptions != null && !exceptions.isEmpty()) {
- List<Exception> filteredExceptions = ExceptionFilterUtils
+ List<Exception> filteredExceptions = ExceptionUtils
.getActualExceptions(exceptions);
for (Exception exception : filteredExceptions) {
StringWriter exceptionWriter = new StringWriter();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 4d2ad6b..3863eda 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -190,18 +190,29 @@
ActivityId ac2 = ac.getConsumerActivity(cdId);
Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
int nConsumers = ac2TaskStates.length;
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ if (c.allProducersToAllConsumers()) {
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ for (int j = 0; j < nConsumers; j++) {
TaskId targetTID = ac2TaskStates[j].getTaskId();
cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
}
+ for (int i = 0; i < nProducers; ++i) {
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ } else {
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+ .getTaskId());
+ if (cInfoList == null) {
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ TaskId targetTID = ac2TaskStates[j].getTaskId();
+ cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+ }
+ }
}
}
}
@@ -341,9 +352,15 @@
int nConsumers = ac2TaskStates.length;
int[] fanouts = new int[nProducers];
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- fanouts[i] = targetBitmap.cardinality();
+ if (c.allProducersToAllConsumers()) {
+ for (int i = 0; i < nProducers; ++i) {
+ fanouts[i] = nConsumers;
+ }
+ } else {
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ fanouts[i] = targetBitmap.cardinality();
+ }
}
IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
cPolicyMap.put(cdId, cp);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 0b8346b..fd6360a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -27,6 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
@@ -45,6 +48,7 @@
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -458,13 +462,14 @@
private void abortJob(List<Exception> exceptions) {
Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
for (TaskCluster tc : inProgressTaskClustersCopy) {
- abortTaskCluster(findLastTaskClusterAttempt(tc));
+ abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
assert inProgressTaskClusters.isEmpty();
ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
}
- private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+ private void abortTaskCluster(TaskClusterAttempt tcAttempt,
+ TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
@@ -477,11 +482,13 @@
ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
ta.setEndTime(System.currentTimeMillis());
List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
- if (abortTaskAttempts == null) {
+ if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
abortTaskAttempts = new ArrayList<TaskAttemptId>();
abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
}
- abortTaskAttempts.add(taId);
+ if (status == TaskAttempt.TaskStatus.RUNNING) {
+ abortTaskAttempts.add(taId);
+ }
}
}
final JobId jobId = jobRun.getJobId();
@@ -505,6 +512,9 @@
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+
+ tcAttempt.setStatus(failedOrAbortedStatus);
+ tcAttempt.setEndTime(System.currentTimeMillis());
}
private void abortDoomedTaskClusters() throws HyracksException {
@@ -519,9 +529,7 @@
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca != null) {
- abortTaskCluster(tca);
- tca.setEndTime(System.currentTimeMillis());
- tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
}
}
@@ -608,9 +616,7 @@
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
- abortTaskCluster(lastAttempt);
- lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- lastAttempt.setEndTime(System.currentTimeMillis());
+ abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
abortJob(exceptions);
@@ -635,32 +641,41 @@
public void notifyNodeFailures(Set<String> deadNodes) {
try {
jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+ jobRun.getParticipatingNodeIds().removeAll(deadNodes);
+ jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
+ if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(jobRun);
+ return;
+ }
for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
- TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
- if (taskClusters != null) {
- for (TaskCluster tc : taskClusters) {
- TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
- if (lastTaskClusterAttempt != null
- && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
- .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
- boolean abort = false;
- for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
- assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
- if (deadNodes.contains(ta.getNodeId())) {
- ta.setStatus(
- TaskAttempt.TaskStatus.FAILED,
- Collections.singletonList(new Exception("Node " + ta.getNodeId()
- + " failed")));
- ta.setEndTime(System.currentTimeMillis());
- abort = true;
+ if (isPlanned(ac)) {
+ TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+ if (taskClusters != null) {
+ for (TaskCluster tc : taskClusters) {
+ TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+ if (lastTaskClusterAttempt != null
+ && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+ .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+ boolean abort = false;
+ for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+ assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
+ if (deadNodes.contains(ta.getNodeId())) {
+ ta.setStatus(
+ TaskAttempt.TaskStatus.FAILED,
+ Collections.singletonList(new Exception("Node " + ta.getNodeId()
+ + " failed")));
+ ta.setEndTime(System.currentTimeMillis());
+ abort = true;
+ }
+ }
+ if (abort) {
+ abortTaskCluster(lastTaskClusterAttempt,
+ TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
}
- if (abort) {
- abortTaskCluster(lastTaskClusterAttempt);
- }
}
+ abortDoomedTaskClusters();
}
- abortDoomedTaskClusters();
}
}
startRunnableActivityClusters();
@@ -668,4 +683,37 @@
abortJob(Collections.singletonList(e));
}
}
+
+ private void finishJob(final JobRun run) {
+ JobId jobId = run.getJobId();
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private JSONObject createJobLogObject(final JobRun run) {
+ JSONObject jobLogObject = new JSONObject();
+ try {
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.put("activity-cluster-graph", acg.toJSON());
+ jobLogObject.put("job-run", run.toJSON());
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ return jobLogObject;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index bc58d1e..5109078 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -57,6 +58,7 @@
LOGGER.log(Level.WARNING, "Error in stats reporting", e);
throw new RuntimeException(e);
}
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 6e8ddf0..22c3348 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
@@ -48,11 +49,16 @@
@Override
public void run() {
+ LOGGER.warning("Cleanup for JobRun with id: " + jobId);
final JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
LOGGER.warning("Unable to find JobRun with id: " + jobId);
return;
}
+ if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(run);
+ return;
+ }
if (run.getPendingStatus() != null) {
LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
return;
@@ -63,33 +69,47 @@
run.setPendingStatus(status, exceptions);
}
if (targetNodes != null && !targetNodes.isEmpty()) {
+ Set<String> toDelete = new HashSet<String>();
for (String n : targetNodes) {
NodeControllerState ncs = ccs.getNodeMap().get(n);
try {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ }
} catch (Exception e) {
e.printStackTrace();
}
}
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(run);
+ }
} else {
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- ccs.getRunHistory().put(jobId, run.getExceptions());
+ finishJob(run);
+ }
+ }
+
+ private void finishJob(final JobRun run) {
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
}
}
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private JSONObject createJobLogObject(final JobRun run) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 5eb851a..66186ca 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
public class JobletCleanupNotificationWork extends AbstractWork {
@@ -45,6 +46,7 @@
@Override
public void run() {
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
final JobRun run = ccs.getActiveRunMap().get(jobId);
Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index 970a45d..eef6b96 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -14,11 +14,10 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.util.Map;
import java.util.logging.Level;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
@@ -35,11 +34,7 @@
@Override
protected void doRun() throws Exception {
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- NodeControllerState state = nodeMap.get(nodeId);
- if (state != null) {
- state.notifyHeartbeat(hbData);
- }
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index c4c8873..e400fa8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -47,6 +48,7 @@
/** triggered remotely by a NC to notify that the NC is deployed */
DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..77cddf3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -50,6 +51,7 @@
jobletProfile.getTaskProfiles().put(taId, statistics);
}
run.getScheduler().notifyTaskComplete(ta);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
} catch (HyracksException e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..3578fd62 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
private final List<Exception> exceptions;
@@ -38,6 +39,7 @@
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
new file mode 100644
index 0000000..51bbc16
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.control.cc.work.utils;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+
+public class HeartbeatUtils {
+
+ public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ NodeControllerState state = nodeMap.get(nodeId);
+ if (state != null) {
+ state.notifyHeartbeat(hbData);
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..5a6d849 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -55,6 +55,7 @@
} catch (InterruptedException e) {
LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
// There isn't much we can do really here
+ break; // the interrupt was explicit from another thread. This thread should shut down...
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
similarity index 69%
rename from hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
rename to hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
index 44f3bec..cbdc6e5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ExceptionFilterUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/utils/ExceptionUtils.java
@@ -12,15 +12,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.control.common.utils;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* @author yingyib
*/
-public class ExceptionFilterUtils {
+public class ExceptionUtils {
public static List<Exception> getActualExceptions(List<Exception> allExceptions) {
List<Exception> exceptions = new ArrayList<Exception>();
@@ -32,6 +35,17 @@
return exceptions;
}
+ public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
+ List<Exception> newExceptions = new ArrayList<Exception>();
+ for (Exception e : exceptions) {
+ HyracksDataException newException = new HyracksDataException(e);
+ newException.setNodeId(nodeId);
+ newExceptions.add(newException);
+ }
+ exceptions.clear();
+ exceptions.addAll(newExceptions);
+ }
+
private static boolean possibleRootCause(Throwable exception) {
Throwable cause = exception;
while ((cause = cause.getCause()) != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index f12c981..58e12cf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -87,6 +87,7 @@
private class WorkerThread extends Thread {
WorkerThread() {
setDaemon(true);
+ setPriority(MAX_PRIORITY);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6049a3b..b7fcf7f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -266,7 +266,7 @@
heartbeatTask = new HeartbeatTask(ccs);
- // Schedule heartbeat generator.
+ // Schedule heartbeat generator.
timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
if (nodeParameters.getProfileDumpPeriod() > 0) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..09eeab6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.utils.ExceptionUtils;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
@@ -243,6 +244,7 @@
addPendingThread(thread);
String oldName = thread.getName();
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ thread.setPriority(Thread.MIN_PRIORITY);
try {
pushFrames(collector, writer);
} catch (HyracksDataException e) {
@@ -277,6 +279,7 @@
}
if (!exceptions.isEmpty()) {
NodeControllerService ncs = joblet.getNodeController();
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 30b2482..df4d296 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -38,4 +38,9 @@
sourceBitmap.clear();
sourceBitmap.set(0, nProducerPartitions);
}
+
+ @Override
+ public boolean allProducersToAllConsumers(){
+ return true;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 466fead..20a0ed1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -82,4 +82,9 @@
sourceBitmap.clear();
sourceBitmap.set(consumerIndex);
}
+
+ @Override
+ public boolean allProducersToAllConsumers() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 75553e1..85f80ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
@@ -54,7 +55,20 @@
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
}
-
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+ scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
+ }
+
/**
* The constructor of the scheduler.
*
@@ -62,7 +76,8 @@
* the mapping from nc names to nc infos
* @throws HyracksException
*/
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder) throws HyracksException {
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
+ throws HyracksException {
scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index a85a174..c14f23a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -317,7 +317,11 @@
if (!releasedLatches) {
for (int i = 0; i < nodeFrontiers.size(); i++) {
- nodeFrontiers.get(i).page.releaseWriteLatch();
+ try {
+ nodeFrontiers.get(i).page.releaseWriteLatch();
+ } catch (Exception e) {
+ //ignore illegal monitor state exception
+ }
bufferCache.unpin(nodeFrontiers.get(i).page);
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 1e0d87a..a28d059 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -76,6 +76,8 @@
public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
/** the check point hook */
public static final String CKP_CLASS = "pregelix.checkpointHook";
+ /** the check point hook */
+ public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
/**
* Construct a Pregelix job from an existing configuration
@@ -222,6 +224,15 @@
final public void setCheckpointHook(Class<?> ckpClass) {
getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
}
+
+ /**
+ * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+ *
+ * @param ckpClass
+ */
+ final public void setRecoveryCount(int recoveryCount) {
+ getConfiguration().setInt(RECOVERY_COUNT, recoveryCount);
+ }
@Override
public String toString() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..e3950c8 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,47 @@
public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
}
+
+ /**
+ * Get the path for vertex checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for vertex checkpointing
+ */
+ public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+ }
+
+ /**
+ * Get the path for message checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for message checkpointing
+ */
+ public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+ String path = "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+ return path;
+ }
+
+ /**
+ * Get the path for message checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for message checkpointing
+ */
+ public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
+ }
+
+ /***
+ * Get the recovery count
+ *
+ * @return recovery count
+ */
+ public static int getRecoveryCount(Configuration conf) {
+ return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 6a4a660..3455233 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -25,7 +25,7 @@
@Override
public boolean checkpoint(int superstep) {
- if (superstep % 5 == 0) {
+ if (superstep % 100 == 0) {
return true;
} else {
return false;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..5530144 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,6 +52,7 @@
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
@@ -89,6 +92,7 @@
this.profiling = profiling;
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
+ addHadoopConfiguration(currentJob, ipAddress, port, true);
JobGen jobGen = null;
/** prepare job -- deploy jars */
@@ -99,27 +103,29 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 1;
+ int maxRetryCount = 3;
+ jobGen = selectJobGen(planChoice, currentJob);
do {
try {
for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
lastJob = currentJob;
currentJob = jobs.get(i);
+ currentJob.setRecoveryCount(retryCount);
/** add hadoop configurations */
- addHadoopConfiguration(currentJob, ipAddress, port);
+ addHadoopConfiguration(currentJob, ipAddress, port, failed);
ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
/** load the data */
- if (i == 0 || compatible(lastJob, currentJob)) {
+ if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
if (i != 0) {
finishJobs(jobGen, deploymentId);
/** invalidate/clear checkpoint */
lastSnapshotJobIndex.set(0);
lastSnapshotSuperstep.set(0);
}
- jobGen = selectJobGen(planChoice, currentJob);
+ jobGen.reset(currentJob);
loadData(currentJob, jobGen, deploymentId);
} else {
jobGen.reset(currentJob);
@@ -137,12 +143,16 @@
/** clear checkpoints if any */
jobGen.clearCheckpoints();
hcc.unDeployBinary(deploymentId);
- } catch (IOException ioe) {
- /** disk failures */
- //restart from snapshot
- failed = true;
- retryCount++;
- throw new HyracksException(ioe);
+ } catch (Exception e1) {
+ Set<String> blackListNodes = new HashSet<String>();
+ /** disk failures or node failures */
+ if (ExceptionUtilities.recoverable(e1, blackListNodes)) {
+ ClusterConfig.addToBlackListNodes(blackListNodes);
+ failed = true;
+ retryCount++;
+ } else {
+ throw e1;
+ }
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
@@ -222,9 +232,9 @@
}
private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
- if (hcc == null)
+ if (hcc == null) {
hcc = new HyracksConnection(ipAddress, port);
-
+ }
URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
List<File> jars = new ArrayList<File>();
URL[] urls = classLoader.getURLs();
@@ -235,7 +245,8 @@
return deploymentId;
}
- private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+ private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port, boolean loadClusterConfig)
+ throws HyracksException {
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
if (hadoopCore != null) {
job.getConfiguration().addResource(hadoopCore);
@@ -248,7 +259,9 @@
if (hadoopHdfs != null) {
job.getConfiguration().addResource(hadoopHdfs);
}
- ClusterConfig.loadClusterConfig(ipAddress, port);
+ if (loadClusterConfig) {
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+ }
}
private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
@@ -256,8 +269,13 @@
throws Exception {
if (doRecovery) {
/** reload the checkpoint */
- runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
-
+ if (snapshotSuperstep.get() > 0) {
+ runClearState(deploymentId, jobGen);
+ runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+ } else {
+ runClearState(deploymentId, jobGen);
+ loadData(job, jobGen, deploymentId);
+ }
}
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
boolean terminate = false;
@@ -272,8 +290,8 @@
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
if (ckpHook.checkpoint(i)) {
runCheckpoint(deploymentId, jobGen, i);
- snapshotSuperstep.set(i);
snapshotJobIndex.set(currentJobIndex);
+ snapshotSuperstep.set(i);
}
i++;
} while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..4863378 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -36,9 +36,12 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,10 +64,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -102,10 +102,10 @@
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -137,8 +137,6 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- private String vertexCheckpointPath;
-
public JobGen(PregelixJob job) {
init(job);
}
@@ -148,10 +146,7 @@
pregelixJob = job;
initJobConfiguration();
job.setJobId(jobId);
-
- vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
- // set the frame size to be the one user specified if the user did
- // specify.
+ // set the frame size to be the one user specified if the user did specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
if (specifiedFrameSize > 0) {
frameSize = specifiedFrameSize;
@@ -376,15 +371,21 @@
@Override
public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
-
PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+ FileOutputFormat.setOutputPath(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+ FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+
+ dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+
+ dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+
specs[0] = vertexCkpSpec;
for (int i = 1; i < specs.length; i++) {
specs[i] = stateCkpSpecs[i - 1];
@@ -397,7 +398,7 @@
@Override
public JobSpecification generateLoadingJob() throws HyracksException {
- JobSpecification spec = loadHDFSData(conf);
+ JobSpecification spec = loadHDFSData(pregelixJob);
return spec;
}
@@ -412,13 +413,22 @@
try {
PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
- JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+ FileInputFormat.setInputPaths(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+ JobSpecification[] cleanVertices = generateCleanup();
+ JobSpecification createIndex = generateCreatingJob();
+ JobSpecification vertexLoadSpec = loadHDFSData(tmpJob);
JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
- JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
- specs[0] = vertexLoadSpec;
- for (int i = 1; i < specs.length; i++) {
- specs[i] = stateLoadSpecs[i - 1];
+ JobSpecification[] specs = new JobSpecification[cleanVertices.length + 2 + stateLoadSpecs.length];
+
+ int i = 0;
+ for (; i < cleanVertices.length; i++) {
+ specs[i] = cleanVertices[i];
+ }
+ specs[i++] = createIndex;
+ specs[i++] = vertexLoadSpec;
+ for (; i < specs.length; i++) {
+ specs[i] = stateLoadSpecs[i - cleanVertices.length - 2];
}
return specs;
} catch (Exception e) {
@@ -479,7 +489,8 @@
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+ private JobSpecification loadHDFSData(PregelixJob job) throws HyracksException, HyracksDataException {
+ Configuration conf = job.getConfiguration();
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
@@ -492,7 +503,7 @@
VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+ splits = inputFormat.getSplits(job, fileSplitProvider.getFileSplits().length);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -591,7 +602,8 @@
*/
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
+ inputRdFactory);
ClusterConfig.setLocationConstraint(spec, writer);
/**
@@ -637,18 +649,21 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ false);
ClusterConfig.setLocationConstraint(spec, materializeRead);
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
tmpJob.setOutputKeyClass(vertexIdClass);
tmpJob.setOutputValueClass(MsgList.class);
- ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
- HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, hdfsWriter);
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -660,25 +675,26 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
throws HyracksException {
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
- tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
try {
FileInputFormat.setInputPaths(tmpJob, checkpointPath);
} catch (IOException e) {
throw new HyracksException(e);
}
- Configuration conf = job.getConfiguration();
+ Configuration conf = tmpJob.getConfiguration();
Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
JobSpecification spec = new JobSpecification();
/***
* HDFS read operator
*/
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
+ splits = inputFormat.getSplits(tmpJob);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -692,6 +708,16 @@
readSchedule, new KeyValueParserFactory());
ClusterConfig.setLocationConstraint(spec, scanner);
+ /** construct the sort operator to sort message states */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+ nkmFactory, sortCmpFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sort);
+
/**
* construct the materializing write operator
*/
@@ -701,7 +727,7 @@
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
- new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+ new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
pregelixJob.getConfiguration())));
ClusterConfig.setLocationConstraint(spec, postSuperStep);
@@ -714,7 +740,8 @@
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- materialize, 0);
+ sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 41887c0..9389f62 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -21,15 +21,17 @@
import java.util.List;
import java.util.logging.Logger;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -49,17 +51,12 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.io.VertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
-import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -71,8 +68,8 @@
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -344,7 +341,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -525,14 +523,10 @@
/** generate plan specific state checkpointing */
protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
- PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
/** generate secondary index checkpoint */
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
- FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
- tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
- tmpJob.setOutputValueClass(MsgList.class);
+ PregelixJob tmpJob = this.createCloneJob("Secondary index checkpointing for job " + jobId, pregelixJob);
+
JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
@@ -549,11 +543,12 @@
@Override
protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
- JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
- PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
- tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ /** generate message checkpoint load */
+ JobSpecification[] msgCkpSpecs = super.generateStateCheckpointLoading(lastSuccessfulIteration, job);
/** generate secondary index checkpoint load */
+ PregelixJob tmpJob = this.createCloneJob("Secondary index checkpoint loading for job " + jobId, pregelixJob);
+ tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
for (int i = 0; i < msgCkpSpecs.length; i++) {
@@ -569,23 +564,23 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
JobSpecification spec = new JobSpecification();
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
+ String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
- tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
try {
FileInputFormat.setInputPaths(tmpJob, checkpointPath);
} catch (IOException e) {
throw new HyracksException(e);
}
- Configuration conf = job.getConfiguration();
/***
- * construct HDFS read operator
+ * HDFS read operator
*/
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
+ splits = inputFormat.getSplits(tmpJob);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -599,6 +594,16 @@
readSchedule, new KeyValueParserFactory());
ClusterConfig.setLocationConstraint(spec, scanner);
+ /** construct the sort operator to sort message states */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
+ nkmFactory, sortCmpFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sort);
+
/**
* construct bulk-load index operator
*/
@@ -621,8 +626,8 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- btreeBulkLoad, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, btreeBulkLoad, 0);
spec.setFrameSize(frameSize);
return spec;
@@ -631,6 +636,12 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
+ FileOutputFormat.setOutputPath(job, new Path(checkpointPath));
+ job.setOutputKeyClass(BspUtils.getVertexIndexClass(job.getConfiguration()));
+ job.setOutputValueClass(MsgList.class);
+
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
@@ -654,7 +665,6 @@
/**
* construct btree search operator
*/
- ConfFactory confFactory = new ConfFactory(job);
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -674,8 +684,9 @@
/**
* construct write file operator
*/
- HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
- confFactory));
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
ClusterConfig.setLocationConstraint(spec, writer);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index c29ea18..287b797 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -299,7 +299,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index dc61971..3b3c9e7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -288,7 +288,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 34f723f..e334095 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -302,7 +302,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index ea6cc8a..89fbdcd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -20,12 +20,15 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,6 +55,8 @@
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
private static Scheduler hdfsScheduler;
+ private static Set<String> blackListNodes = new HashSet<String>();
+ private static IHyracksClientConnection hcc;
/**
* let tests set config path to be whatever
@@ -197,9 +202,19 @@
public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ if (hcc == null) {
+ hcc = new HyracksConnection(ipAddress, port);
+ }
Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
+
+ /**
+ * remove black list nodes -- which had disk failures
+ */
+ for (String blackListNode : blackListNodes) {
+ ncNameToNcInfos.remove(blackListNode);
+ }
+
NCs = new String[ncNameToNcInfos.size()];
ipToNcMapping = new HashMap<String, List<String>>();
int i = 0;
@@ -216,7 +231,7 @@
i++;
}
- hdfsScheduler = new Scheduler(ipAddress, port);
+ hdfsScheduler = new Scheduler(hcc.getNodeControllerInfos(), hcc.getClusterTopology());
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -240,4 +255,8 @@
}
return locations;
}
+
+ public static void addToBlackListNodes(Collection<String> nodes) {
+ blackListNodes.addAll(nodes);
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
new file mode 100644
index 0000000..a4c4501
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.IOException;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The util to analysis exceptions
+ *
+ * @author yingyib
+ */
+public class ExceptionUtilities {
+
+ /**
+ * Check whether a exception is recoverable or not
+ *
+ * @param exception
+ * @return true or false
+ */
+ public static boolean recoverable(Exception exception, Set<String> blackListNodes) {
+ String message = exception.getMessage();
+
+ /***
+ * check interrupted exception
+ */
+ if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
+ || message.contains("Failure occurred on input")) {
+ return true;
+ }
+ Throwable cause = exception;
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof InterruptedException) {
+ return true;
+ }
+ }
+
+ /***
+ * check io exception
+ */
+ cause = exception;
+ String blackListNode = null;
+ if (cause instanceof HyracksDataException) {
+ blackListNode = ((HyracksDataException) cause).getNodeId();
+ }
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof IOException) {
+ if (containsIOManager(cause)) {
+ if (blackListNode != null) {
+ blackListNodes.add(blackListNode);
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if the exception traces contains the IOManager, which means there are disk failures
+ *
+ * @param cause
+ * @return true if IOManager is in the trace; false otherwise.
+ */
+ private static boolean containsIOManager(Throwable cause) {
+ StackTraceElement[] traces = cause.getStackTrace();
+ for (StackTraceElement e : traces) {
+ if (e.getClassName().endsWith("IOManager")) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e1795de..98be8cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,8 +61,8 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
- //ccConfig.heartbeatPeriod = 5000;
- //ccConfig.maxHeartbeatLapsePeriods = 1;
+ ccConfig.heartbeatPeriod = 1000;
+ ccConfig.maxHeartbeatLapsePeriods = 15;
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -98,14 +98,22 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void showDownNC1() throws Exception {
+ public static void startNC1() throws Exception {
+ nc1.start();
+ }
+
+ public static void shutdownNC1() throws Exception {
nc1.stop();
}
- public static void showDownNC2() throws Exception {
+ public static void shutdownNC2() throws Exception {
nc2.stop();
}
+ public static void shutdownCC() throws Exception {
+ cc.stop();
+ }
+
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7221cb5..b22e468 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -230,8 +230,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index b21cd2a..0ecfd03 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -254,8 +254,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
/** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index dd6ee3c..e64e9cc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -245,6 +245,13 @@
@Override
public void fail() throws HyracksDataException {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 3cebfb8..a9c787f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -219,8 +219,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
/** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index bbe2764..86a211f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -205,6 +205,13 @@
@Override
public void fail() throws HyracksDataException {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index c4890e1..c985f64 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -100,6 +100,11 @@
@Override
public void fail() throws HyracksDataException {
-
+ try {
+ bulkLoader.end();
+ } catch (IndexException e) {
+ treeIndexOpHelper.close();
+ throw new HyracksDataException(e);
+ }
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index bd85e3e..de87909 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -224,8 +224,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..a1177c8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -19,42 +19,45 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
public class HDFSFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+ private final ConfFactory confFactory;
private final IRecordDescriptorFactory inputRdFactory;
- public HDFSFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
- IRecordDescriptorFactory inputRdFactory) {
+ public HDFSFileWriteOperatorDescriptor(JobSpecification spec, Job conf, IRecordDescriptorFactory inputRdFactory)
+ throws HyracksException {
super(spec, 1, 0);
- this.confFactory = confFactory;
- this.inputRdFactory = inputRdFactory;
+ try {
+ this.confFactory = new ConfFactory(conf);
+ this.inputRdFactory = inputRdFactory;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
}
@SuppressWarnings("rawtypes")
@@ -65,12 +68,12 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf;
- private VertexWriter vertexWriter;
+ private Job job;
+ private RecordWriter recordWriter;
private TaskAttemptContext context;
+ private ContextFactory ctxFactory = new ContextFactory();
private String TEMP_DIR = "_temporary";
private ClassLoader ctxCL;
- private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
@@ -79,16 +82,16 @@
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- conf = confFactory.createConfiguration(ctx);
-
- VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- context = ctxFactory.createContext(conf, partition);
- context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ job = confFactory.getConf();
try {
- vertexWriter = outputFormat.createVertexWriter(context);
+ OutputFormat outputFormat = ReflectionUtils.newInstance(job.getOutputFormatClass(),
+ job.getConfiguration());
+ context = ctxFactory.createContext(job.getConfiguration(), partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ recordWriter = outputFormat.getRecordWriter(context);
} catch (InterruptedException e) {
throw new HyracksDataException(e);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
}
@@ -100,8 +103,9 @@
try {
while (!frameDeserializer.done()) {
Object[] tuple = frameDeserializer.deserializeRecord();
- Vertex value = (Vertex) tuple[1];
- vertexWriter.writeVertex(value);
+ Object key = tuple[0];
+ Object value = tuple[1];
+ recordWriter.write(key, value);
}
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -118,7 +122,7 @@
@Override
public void close() throws HyracksDataException {
try {
- vertexWriter.close(context);
+ recordWriter.close(context);
moveFilesToFinalPath();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -129,9 +133,8 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
- FileSystem dfs = FileSystem.get(conf);
+ FileSystem dfs = FileSystem.get(job.getConfiguration());
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
FileStatus[] results = findPartitionPaths(outputPath, dfs);
if (results.length >= 1) {
@@ -161,7 +164,7 @@
FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
@Override
public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
+ return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
}
});
Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index ca8f190..b44b643 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -30,9 +30,12 @@
public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
+ private final boolean removeIterationState;
- public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+ public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
+ boolean removeIterationState) {
super(spec, 1, 1);
+ this.removeIterationState = removeIterationState;
recordDescriptors[0] = recordDescriptor;
}
@@ -73,7 +76,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ writer.fail();
}
@Override
@@ -81,7 +84,9 @@
/**
* remove last iteration's state
*/
- IterationUtils.removeIterationState(ctx, partition);
+ if (removeIterationState) {
+ IterationUtils.removeIterationState(ctx, partition);
+ }
writer.close();
complete = true;
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..f3ec40e
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+ IRecordDescriptorFactory inputRdFactory) {
+ super(spec, 1, 0);
+ this.confFactory = confFactory;
+ this.inputRdFactory = inputRdFactory;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private Configuration conf;
+ private VertexWriter vertexWriter;
+ private TaskAttemptContext context;
+ private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor(ctx);
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration(ctx);
+
+ VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
+ context = ctxFactory.createContext(conf, partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ try {
+ vertexWriter = outputFormat.createVertexWriter(context);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ try {
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ Vertex value = (Vertex) tuple[1];
+ vertexWriter.writeVertex(value);
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ vertexWriter.close(context);
+ moveFilesToFinalPath();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void moveFilesToFinalPath() throws HyracksDataException {
+ try {
+ JobContext job = ctxFactory.createJobContext(conf);
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ FileSystem dfs = FileSystem.get(conf);
+ Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+ && dir.getName().indexOf(".crc") < 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index bfe89ab..f3f7513 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -147,11 +147,15 @@
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
- public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
+ public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
Boolean toMove = jobIdToMove.get(jobId);
if (toMove == null || toMove == true) {
if (jobIdToSuperStep.get(jobId) == null) {
- jobIdToSuperStep.put(jobId, 0L);
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
}
long superStep = jobIdToSuperStep.get(jobId);
@@ -175,6 +179,35 @@
System.gc();
}
+ public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
+ long currentIteration) {
+ if (jobIdToSuperStep.get(jobId) == null) {
+ if (currentIteration <= 0) {
+ jobIdToSuperStep.put(jobId, 0L);
+ } else {
+ jobIdToSuperStep.put(jobId, currentIteration);
+ }
+ }
+
+ long superStep = jobIdToSuperStep.get(jobId);
+ List<FileReference> files = iterationToFiles.remove(superStep - 1);
+ if (files != null) {
+ for (FileReference fileRef : files)
+ fileRef.delete();
+ }
+
+ if (currentIteration > 0) {
+ Vertex.setSuperstep(currentIteration);
+ } else {
+ Vertex.setSuperstep(++superStep);
+ }
+ Vertex.setNumVertices(numVertices);
+ Vertex.setNumEdges(numEdges);
+ jobIdToSuperStep.put(jobId, superStep);
+ jobIdToMove.put(jobId, true);
+ LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+ }
+
public synchronized void endSuperStep(String pregelixJobId) {
jobIdToMove.put(pregelixJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1cf81ac..02097bf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,13 +75,21 @@
context.endSuperStep(giraphJobId);
}
- public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
+ public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
}
+ public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+ long currentIteration) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+ }
+
public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
throws HyracksDataException {
try {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
new file mode 100644
index 0000000..efc7bcc
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryConnectedComponentsTest {
+ private static String INPUTPATH = "data/webmapcomplex";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+ job.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
new file mode 100644
index 0000000..ff1e29f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryInnerJoinTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, Plan.INNER_JOIN, "127.0.0.1",
+ PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index dac087f..3fdaf15 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -37,7 +38,7 @@
public class FailureRecoveryTest {
private static String INPUTPATH = "data/webmap";
private static String OUTPUTPAH = "actual/result";
- private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
@Test
public void test() throws Exception {
@@ -57,29 +58,30 @@
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- // Thread thread = new Thread(new Runnable() {
- //
- // @Override
- // public void run() {
- // try {
- // synchronized (this) {
- // this.wait(10000);
- // PregelixHyracksIntegrationUtil.showDownNC1();
- // }
- // } catch (Exception e) {
- // throw new IllegalStateException(e);
- // }
- // }
- //
- // });
- //thread.start();
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
} catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
throw e;
- } finally {
- testCluster.tearDown();
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
new file mode 100644
index 0000000..e006ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryWithoutCheckpointTest {
+ private static String INPUTPATH = "data/webmap";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (Vertex.getSuperstep() <= 5) {
+ this.wait(200);
+ }
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ PregelixHyracksIntegrationUtil.shutdownNC2();
+ testCluster.cleanupHDFS();
+ throw e;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
similarity index 100%
rename from pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/FailureVertex.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertex.java
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+ job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index 40ea690..660d9eb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -126,7 +126,7 @@
/**
* cleanup hdfs cluster
*/
- private void cleanupHDFS() throws Exception {
+ public void cleanupHDFS() throws Exception {
dfsCluster.shutdown();
}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
new file mode 100755
index 0000000..2c975de
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
@@ -0,0 +1,10 @@
+0 0
+2 0
+4 0
+6 0
+8 0
+10 0
+12 0
+14 0
+16 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
new file mode 100755
index 0000000..6976bc1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
@@ -0,0 +1,13 @@
+1 0
+3 0
+5 0
+7 0
+9 0
+11 0
+13 0
+15 0
+17 0
+19 0
+21 21
+25 25
+27 27
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
new file mode 100755
index 0000000..d135b86
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-0
@@ -0,0 +1,10 @@
+0 0.008290140026154316
+2 0.14646839195826472
+4 0.03976979906329426
+6 0.015736276824953852
+8 0.010628239626209894
+10 0.008290140026154316
+12 0.14646839195826472
+14 0.03976979906329426
+16 0.015736276824953852
+18 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
new file mode 100755
index 0000000..d3badee
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal2/part-1
@@ -0,0 +1,10 @@
+1 0.15351528192471647
+3 0.08125113985998214
+5 0.0225041581462058
+7 0.012542224114863661
+9 0.009294348455354817
+11 0.15351528192471647
+13 0.08125113985998214
+15 0.0225041581462058
+17 0.012542224114863661
+19 0.009294348455354817
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
index 35e7cd8..4720272 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -30,11 +30,11 @@
*/
public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
private static final long serialVersionUID = 1L;
- private final int currentSuperStep;
+ private final long currentSuperStep;
private String jobId;
private IConfigurationFactory confFactory;
- public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+ public RecoveryRuntimeHookFactory(String jobId, long currentSuperStep, IConfigurationFactory confFactory) {
this.currentSuperStep = currentSuperStep;
this.jobId = jobId;
this.confFactory = confFactory;
@@ -48,7 +48,7 @@
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
IterationUtils.endSuperStep(jobId, ctx);
Configuration conf = confFactory.createConfiguration(ctx);
- IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+ IterationUtils.recoverProperties(jobId, ctx, conf, currentSuperStep);
}
};