Added sort-merge collector. tests pass
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@389 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index a7646fc..8584d36 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -22,7 +22,7 @@
public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry {
public IHyracksJobletContext getJobletContext();
- public TaskAttemptId getTaskId();
+ public TaskAttemptId getTaskAttemptId();
public ICounterContext getCounterContext();
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index aebd6ce..4210a23 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -206,8 +206,8 @@
@Override
public void notifyTaskFailure(UUID jobId, TaskAttemptId taskId, String nodeId, Exception exception)
throws Exception {
- TaskFailureEvent sfe = new TaskFailureEvent(this, jobId, taskId, nodeId, exception);
- jobQueue.schedule(sfe);
+ TaskFailureEvent tfe = new TaskFailureEvent(this, jobId, taskId, nodeId, exception);
+ jobQueue.schedule(tfe);
}
@Override
@@ -240,9 +240,6 @@
@Override
public synchronized void nodeHeartbeat(String id) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Heartbeat from: " + id);
- }
jobQueue.schedule(new NodeHeartbeatEvent(this, id));
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 1e9261b..0c515f4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -20,6 +20,7 @@
import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
public class ActivityCluster {
@@ -31,7 +32,7 @@
private final Set<ActivityCluster> dependents;
- private final Map<ActivityNodeId, TaskState[]> taskStateMap;
+ private final Map<ActivityNodeId, Task[]> taskStateMap;
private TaskCluster[] taskClusters;
@@ -42,7 +43,7 @@
this.activities = activities;
dependencies = new HashSet<ActivityCluster>();
dependents = new HashSet<ActivityCluster>();
- taskStateMap = new HashMap<ActivityNodeId, TaskState[]>();
+ taskStateMap = new HashMap<ActivityNodeId, Task[]>();
}
public Set<ActivityNodeId> getActivities() {
@@ -61,7 +62,7 @@
return dependencies;
}
- public Map<ActivityNodeId, TaskState[]> getTaskStateMap() {
+ public Map<ActivityNodeId, Task[]> getTaskStateMap() {
return taskStateMap;
}
@@ -84,4 +85,16 @@
public JobRun getJobRun() {
return jobRun;
}
+
+ public int getMaxTaskClusterAttempts() {
+ return 1;
+ }
+
+ public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
+ acsm.notifyTaskClusterFailure(tcAttempt, exception);
+ }
+
+ public void notifyActivityClusterComplete() throws HyracksException {
+ jobRun.getStateMachine().notifyActivityClusterComplete(this);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
similarity index 72%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 3bbba36..3ca75b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -15,20 +15,28 @@
package edu.uci.ics.hyracks.control.cc.job;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
-public class TaskState {
+public class Task {
private final TaskId taskId;
+ private final ActivityPartitionDetails apd;
+
private TaskCluster taskCluster;
- public TaskState(TaskId taskId) {
+ public Task(TaskId taskId, ActivityPartitionDetails apd) {
this.taskId = taskId;
+ this.apd = apd;
}
public TaskId getTaskId() {
return taskId;
}
+ public ActivityPartitionDetails getActivityPartitionDetails() {
+ return apd;
+ }
+
public TaskCluster getTaskCluster() {
return taskCluster;
}
@@ -36,4 +44,9 @@
public void setTaskCluster(TaskCluster taskCluster) {
this.taskCluster = taskCluster;
}
+
+ @Override
+ public String toString() {
+ return taskId + "(" + apd + ")";
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index 6d61ccb..f17ac12 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -26,9 +26,11 @@
ABORTED,
}
+ private final TaskClusterAttempt tcAttempt;
+
private final TaskAttemptId taskId;
- private final TaskState taskState;
+ private final Task taskState;
private String nodeId;
@@ -36,16 +38,21 @@
private Exception exception;
- public TaskAttempt(TaskAttemptId taskId, TaskState taskState) {
+ public TaskAttempt(TaskClusterAttempt tcAttempt, TaskAttemptId taskId, Task taskState) {
+ this.tcAttempt = tcAttempt;
this.taskId = taskId;
this.taskState = taskState;
}
+ public TaskClusterAttempt getTaskClusterAttempt() {
+ return tcAttempt;
+ }
+
public TaskAttemptId getTaskAttemptId() {
return taskId;
}
- public TaskState getTaskState() {
+ public Task getTaskState() {
return taskState;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index 78ac34b..068ec5f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -24,7 +24,7 @@
public class TaskCluster {
private final ActivityCluster activityCluster;
- private final TaskState[] tasks;
+ private final Task[] tasks;
private final Set<TaskCluster> blockers;
@@ -32,7 +32,7 @@
private final List<TaskClusterAttempt> taskClusterAttempts;
- public TaskCluster(ActivityCluster activityCluster, TaskState[] tasks) {
+ public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
this.activityCluster = activityCluster;
this.tasks = tasks;
this.blockers = new HashSet<TaskCluster>();
@@ -40,7 +40,7 @@
taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
}
- public TaskState[] getTasks() {
+ public Task[] getTasks() {
return tasks;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
index f0d951e..9d33d12 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterAttempt.java
@@ -14,22 +14,34 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
-
public class TaskClusterAttempt {
public enum TaskClusterStatus {
- INITIALIZED,
RUNNING,
COMPLETED,
FAILED,
+ ABORTED,
}
- private final TaskAttempt[] taskAttempts;
+ private final TaskCluster taskCluster;
+
+ private final int attempt;
+
+ private TaskAttempt[] taskAttempts;
private TaskClusterStatus status;
private int pendingTaskCounter;
- public TaskClusterAttempt(TaskAttempt[] taskAttempts) {
+ public TaskClusterAttempt(TaskCluster taskCluster, int attempt) {
+ this.taskCluster = taskCluster;
+ this.attempt = attempt;
+ }
+
+ public TaskCluster getTaskCluster() {
+ return taskCluster;
+ }
+
+ public void setTaskAttempts(TaskAttempt[] taskAttempts) {
this.taskAttempts = taskAttempts;
}
@@ -37,6 +49,10 @@
return taskAttempts;
}
+ public int getAttempt() {
+ return attempt;
+ }
+
public void setStatus(TaskClusterStatus status) {
this.status = status;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
new file mode 100644
index 0000000..a69b144
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
+import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
+
+public abstract class AbstractTaskLifecycleEvent extends AbstractEvent {
+ protected final ClusterControllerService ccs;
+ protected final UUID jobId;
+ protected final TaskAttemptId taId;
+ protected final String nodeId;
+
+ public AbstractTaskLifecycleEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.taId = taId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public final void run() {
+ JobRun run = ccs.getRunMap().get(jobId);
+ if (run != null) {
+ TaskId tid = taId.getTaskId();
+ Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+ ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
+ if (ac != null) {
+ Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+ Task[] taskStates = taskStateMap.get(tid.getActivityId());
+ if (taskStates != null && taskStates.length > tid.getPartition()) {
+ Task ts = taskStates[tid.getPartition()];
+ TaskCluster tc = ts.getTaskCluster();
+ List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
+ if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
+ TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
+ for (TaskAttempt ta : tca.getTaskAttempts()) {
+ if (ta.getTaskAttemptId().equals(taId)) {
+ performEvent(ta);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected abstract void performEvent(TaskAttempt ta);
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
index 69c244a..050c6ed 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
@@ -19,13 +19,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-public class ApplicationDestroyEvent implements Runnable {
+public class ApplicationDestroyEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final String appName;
private FutureValue fv;
@@ -57,7 +58,7 @@
fv.setException(e);
return;
}
- ccs.getJobQueue().schedule(new Runnable() {
+ ccs.getJobQueue().schedule(new AbstractEvent() {
@Override
public void run() {
try {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
index 9f4ad8f..ab05d0f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
@@ -20,13 +20,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-public class ApplicationStartEvent implements Runnable {
+public class ApplicationStartEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final String appName;
private final FutureValue fv;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index f53fb5f..e4353f8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -19,9 +19,9 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetJobProfileJSONEvent extends SynchronizableRunnable {
+public class GetJobProfileJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
private final int attempt;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
index 509dcd2..195b8c1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
@@ -20,9 +20,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetJobSpecificationJSONEvent extends SynchronizableRunnable {
+public class GetJobSpecificationJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
private JSONObject spec;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
index c80974a..7768587 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusConditionVariableEvent.java
@@ -18,9 +18,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetJobStatusConditionVariableEvent extends SynchronizableRunnable {
+public class GetJobStatusConditionVariableEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
private IJobStatusConditionVariable cVar;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
index 9b15e5c..589ac34 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobStatusEvent.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetJobStatusEvent extends SynchronizableRunnable {
+public class GetJobStatusEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
private JobStatus status;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
index 3c19f59..bf5e5c3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSummariesJSONEvent.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetJobSummariesJSONEvent extends SynchronizableRunnable {
+public class GetJobSummariesJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private JSONArray summaries;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
index 7554e62..0db7f2a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetNodeEvent.java
@@ -16,9 +16,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class GetNodeEvent extends SynchronizableRunnable {
+public class GetNodeEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final String nodeId;
private NodeControllerState state;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index 87ba685..f59e9cc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -22,10 +22,11 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
-public class JobCleanupEvent implements Runnable {
+public class JobCleanupEvent extends AbstractEvent {
private ClusterControllerService ccs;
private UUID jobId;
private JobStatus status;
@@ -57,7 +58,7 @@
e.printStackTrace();
}
}
- ccs.getJobQueue().schedule(new Runnable() {
+ ccs.getJobQueue().schedule(new AbstractEvent() {
@Override
public void run() {
CCApplicationContext appCtx = ccs.getApplicationMap().get(
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index d6b09db..9e47d8a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -29,9 +29,9 @@
import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class JobCreateEvent extends SynchronizableRunnable {
+public class JobCreateEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final byte[] jobSpec;
private final EnumSet<JobFlag> jobFlags;
@@ -65,7 +65,7 @@
});
final JobActivityGraph jag = builder.getActivityGraph();
- JobRun run = new JobRun(UUID.randomUUID(), jag);
+ JobRun run = new JobRun(jobId, jag);
run.setStatus(JobStatus.INITIALIZED, null);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
index 7b9a3c6..5fd70b4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobStartEvent.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class JobStartEvent extends SynchronizableRunnable {
+public class JobStartEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final UUID jobId;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
index 348d703..8643e6a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/NodeHeartbeatEvent.java
@@ -15,12 +15,13 @@
package edu.uci.ics.hyracks.control.cc.job.manager.events;
import java.util.Map;
+import java.util.logging.Level;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class NodeHeartbeatEvent extends SynchronizableRunnable {
+public class NodeHeartbeatEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final String nodeId;
@@ -37,4 +38,9 @@
state.notifyHeartbeat();
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.FINEST;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..6316b3e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -18,9 +18,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class RegisterNodeEvent extends SynchronizableRunnable {
+public class RegisterNodeEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final String nodeId;
private final NodeControllerState state;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 0269122..e9861d8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -21,9 +21,10 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.common.base.INodeController;
-public class RegisterPartitionAvailibilityEvent implements Runnable {
+public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final PartitionId pid;
private final NetworkAddress networkAddress;
@@ -58,4 +59,9 @@
}
}
}
+
+ @Override
+ public String toString() {
+ return "PartitionAvailable@" + networkAddress + "[" + pid + "]";
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index 4d34e6e..e7c92e7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -22,9 +22,10 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.common.base.INodeController;
-public class RegisterPartitionRequestEvent implements Runnable {
+public class RegisterPartitionRequestEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final Collection<PartitionId> requiredPartitionIds;
private final String nodeId;
@@ -61,4 +62,9 @@
}
}
}
+
+ @Override
+ public String toString() {
+ return "PartitionRequest@[" + nodeId + "][" + requiredPartitionIds + "]";
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 8a1703b..b64a784 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -17,12 +17,14 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
-public class RemoveDeadNodesEvent implements Runnable {
+public class RemoveDeadNodesEvent extends AbstractEvent {
private static Logger LOGGER = Logger.getLogger(RemoveDeadNodesEvent.class.getName());
private final ClusterControllerService ccs;
@@ -47,4 +49,9 @@
// Deal with dead tasks.
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.FINE;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
index 24f884a..a103bdd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ReportProfilesEvent.java
@@ -17,12 +17,14 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.logging.Level;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-public class ReportProfilesEvent implements Runnable {
+public class ReportProfilesEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final List<JobProfile> profiles;
@@ -38,4 +40,9 @@
JobRun run = runMap.get(profile.getJobId());
}
}
+
+ @Override
+ public Level logLevel() {
+ return Level.FINEST;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
index f9fa355..29a341f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskCompleteEvent.java
@@ -14,65 +14,29 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
-public class TaskCompleteEvent implements Runnable {
- private final ClusterControllerService ccs;
- private final UUID jobId;
- private final TaskAttemptId taId;
- private final String nodeId;
-
+public class TaskCompleteEvent extends AbstractTaskLifecycleEvent {
public TaskCompleteEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.taId = taId;
- this.nodeId = nodeId;
+ super(ccs, jobId, taId, nodeId);
}
@Override
- public void run() {
- JobRun run = ccs.getRunMap().get(jobId);
- if (run != null) {
- TaskId tid = taId.getTaskId();
- Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
- ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
- if (ac != null) {
- Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
- TaskState[] taskStates = taskStateMap.get(tid.getActivityId());
- if (taskStates != null && taskStates.length > tid.getPartition()) {
- TaskState ts = taskStates[tid.getPartition()];
- TaskCluster tc = ts.getTaskCluster();
- List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
- if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
- TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
- TaskAttempt ta = tca.getTaskAttempts()[tid.getPartition()];
- try {
- ta.notifyTaskComplete();
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- }
- }
+ protected void performEvent(TaskAttempt ta) {
+ try {
+ ta.notifyTaskComplete();
+ } catch (HyracksException e) {
+ e.printStackTrace();
}
}
@Override
public String toString() {
- return "TaskCompleteEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+ return "TaskCompleteEvent@[" + nodeId + "[" + jobId + ":" + taId + "]";
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
index 710c2b1..bd6e71e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/TaskFailureEvent.java
@@ -14,67 +14,33 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
-public class TaskFailureEvent implements Runnable {
- private final ClusterControllerService ccs;
- private final UUID jobId;
- private final TaskAttemptId taId;
- private final String nodeId;
+public class TaskFailureEvent extends AbstractTaskLifecycleEvent {
private final Exception exception;
- public TaskFailureEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId, Exception exception) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.taId = taId;
- this.nodeId = nodeId;
+ public TaskFailureEvent(ClusterControllerService ccs, UUID jobId, TaskAttemptId taId, String nodeId,
+ Exception exception) {
+ super(ccs, jobId, taId, nodeId);
this.exception = exception;
}
@Override
- public void run() {
- JobRun run = ccs.getRunMap().get(jobId);
- if (run != null) {
- TaskId tid = taId.getTaskId();
- Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
- ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
- if (ac != null) {
- Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
- TaskState[] taskStates = taskStateMap.get(tid.getActivityId());
- if (taskStates != null && taskStates.length > tid.getPartition()) {
- TaskState ts = taskStates[tid.getPartition()];
- TaskCluster tc = ts.getTaskCluster();
- List<TaskClusterAttempt> taskClusterAttempts = tc.getAttempts();
- if (taskClusterAttempts != null && taskClusterAttempts.size() > taId.getAttempt()) {
- TaskClusterAttempt tca = taskClusterAttempts.get(taId.getAttempt());
- TaskAttempt ta = tca.getTaskAttempts()[tid.getPartition()];
- try {
- ta.notifyTaskFailure(exception);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- }
- }
+ protected void performEvent(TaskAttempt ta) {
+ try {
+ ta.notifyTaskFailure(exception);
+ } catch (HyracksException e) {
+ e.printStackTrace();
}
}
@Override
public String toString() {
- return "TaskCompleteEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+ return "TaskFailureEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
index fc1ba03..b89de07 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/UnregisterNodeEvent.java
@@ -18,9 +18,9 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
-public class UnregisterNodeEvent extends SynchronizableRunnable {
+public class UnregisterNodeEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final String nodeId;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java
new file mode 100644
index 0000000..5fcd56a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/AbstractEvent.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+import java.util.logging.Level;
+
+public abstract class AbstractEvent implements Runnable {
+ public Level logLevel() {
+ return Level.INFO;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
index f533ad0..84a844c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/JobQueue.java
@@ -20,21 +20,23 @@
public class JobQueue {
private static final Logger LOGGER = Logger.getLogger(JobQueue.class.getName());
- private final LinkedBlockingQueue<Runnable> queue;
+ private final LinkedBlockingQueue<AbstractEvent> queue;
private final JobThread thread;
public JobQueue() {
- queue = new LinkedBlockingQueue<Runnable>();
+ queue = new LinkedBlockingQueue<AbstractEvent>();
thread = new JobThread();
thread.start();
}
- public void schedule(Runnable runnable) {
- LOGGER.info("Scheduling: " + runnable);
- queue.offer(runnable);
+ public void schedule(AbstractEvent event) {
+ if (LOGGER.isLoggable(event.logLevel())) {
+ LOGGER.info("Scheduling: " + event);
+ }
+ queue.offer(event);
}
- public void scheduleAndSync(SynchronizableRunnable sRunnable) throws Exception {
+ public void scheduleAndSync(SynchronizableEvent sRunnable) throws Exception {
schedule(sRunnable);
sRunnable.sync();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java
new file mode 100644
index 0000000..88c9097
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+public abstract class SynchronizableEvent extends AbstractEvent {
+ private boolean done;
+
+ private Exception e;
+
+ protected abstract void doRun() throws Exception;
+
+ public void init() {
+ done = false;
+ e = null;
+ }
+
+ @Override
+ public final void run() {
+ try {
+ doRun();
+ } catch (Exception e) {
+ this.e = e;
+ } finally {
+ synchronized (this) {
+ done = true;
+ notifyAll();
+ }
+ }
+ }
+
+ public final synchronized void sync() throws Exception {
+ while (!done) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java
deleted file mode 100644
index 53315b7..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/SynchronizableRunnable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.control.cc.jobqueue;
-
-public abstract class SynchronizableRunnable implements Runnable {
- private boolean done;
-
- private Exception e;
-
- protected abstract void doRun() throws Exception;
-
- public void init() {
- done = false;
- e = null;
- }
-
- @Override
- public final void run() {
- try {
- doRun();
- } catch (Exception e) {
- this.e = e;
- } finally {
- synchronized (this) {
- done = true;
- notifyAll();
- }
- }
- }
-
- public final synchronized void sync() throws Exception {
- while (!done) {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if (e != null) {
- throw e;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
new file mode 100644
index 0000000..d3fc871
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityPartitionDetails.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.Arrays;
+
+public class ActivityPartitionDetails {
+ private final int nPartitions;
+
+ private final int[] nInputPartitions;
+
+ private final int[] nOutputPartitions;
+
+ public ActivityPartitionDetails(int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+ this.nPartitions = nPartitions;
+ this.nInputPartitions = nInputPartitions;
+ this.nOutputPartitions = nOutputPartitions;
+ }
+
+ public int getPartitionCount() {
+ return nPartitions;
+ }
+
+ public int[] getInputPartitionCounts() {
+ return nInputPartitions;
+ }
+
+ public int[] getOutputPartitionCounts() {
+ return nOutputPartitions;
+ }
+
+ @Override
+ public String toString() {
+ return nPartitions + ":" + Arrays.toString(nInputPartitions) + ":" + Arrays.toString(nOutputPartitions);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index 1e68f76..cdd6a65 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -16,17 +16,21 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -36,10 +40,11 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -50,11 +55,14 @@
private final ActivityCluster ac;
+ private final Set<TaskCluster> inProgressTaskClusters;
+
public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
ActivityCluster ac) {
this.ccs = ccs;
this.jsm = jsm;
this.ac = ac;
+ inProgressTaskClusters = new HashSet<TaskCluster>();
}
@Override
@@ -62,67 +70,78 @@
startRunnableTaskClusters();
}
- private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptId>> taskAttemptMap)
+ private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
throws HyracksException {
- TaskState[] tasks = tc.getTasks();
+ Task[] tasks = tc.getTasks();
List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
int attempts = tcAttempts.size();
+ TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts);
TaskAttempt[] taskAttempts = new TaskAttempt[tasks.length];
Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>();
for (int i = 0; i < tasks.length; ++i) {
- TaskState ts = tasks[i];
+ Task ts = tasks[i];
TaskId tid = ts.getTaskId();
- TaskAttempt taskAttempt = new TaskAttempt(new TaskAttemptId(new TaskId(tid.getActivityId(),
+ TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
tid.getPartition()), attempts), ts);
+ taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
locationMap.put(tid,
new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
taskAttempts[i] = taskAttempt;
}
+ tcAttempt.setTaskAttempts(taskAttempts);
PartitionConstraintSolver solver = jsm.getSolver();
solver.solve(locationMap.values());
+ Map<OperatorDescriptorId, String> operatorLocationAssignmentMap = jsm.getOperatorLocationAssignmentMap();
for (int i = 0; i < tasks.length; ++i) {
- TaskState ts = tasks[i];
+ Task ts = tasks[i];
TaskId tid = ts.getTaskId();
TaskAttempt taskAttempt = taskAttempts[i];
- LValueConstraintExpression pLocationExpr = locationMap.get(tid);
- Object location = solver.getValue(pLocationExpr);
- String nodeId = null;
- Set<String> liveNodes = ccs.getNodeMap().keySet();
- if (location == null) {
- // pick any
- nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
- % liveNodes.size()];
- } else if (location instanceof String) {
- nodeId = (String) location;
- if (!liveNodes.contains(nodeId)) {
- throw new HyracksException("Node " + nodeId + " not live");
- }
- } else if (location instanceof String[]) {
- for (String choice : (String[]) location) {
- if (liveNodes.contains(choice)) {
- nodeId = choice;
- break;
+ String nodeId = operatorLocationAssignmentMap.get(tid.getActivityId().getOperatorDescriptorId());
+ if (nodeId == null) {
+ LValueConstraintExpression pLocationExpr = locationMap.get(tid);
+ Object location = solver.getValue(pLocationExpr);
+ Set<String> liveNodes = ccs.getNodeMap().keySet();
+ if (location == null) {
+ // pick any
+ nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
+ % liveNodes.size()];
+ } else if (location instanceof String) {
+ nodeId = (String) location;
+ if (!liveNodes.contains(nodeId)) {
+ throw new HyracksException("Node " + nodeId + " not live");
}
+ } else if (location instanceof String[]) {
+ for (String choice : (String[]) location) {
+ if (liveNodes.contains(choice)) {
+ nodeId = choice;
+ break;
+ }
+ }
+ if (nodeId == null) {
+ throw new HyracksException("No satisfiable location found for "
+ + taskAttempt.getTaskAttemptId());
+ }
+ } else {
+ throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
+ + location.getClass() + ")");
}
- if (nodeId == null) {
- throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
- }
- } else {
- throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
- + location.getClass() + ")");
+ operatorLocationAssignmentMap.put(tid.getActivityId().getOperatorDescriptorId(), nodeId);
}
taskAttempt.setNodeId(nodeId);
- List<TaskAttemptId> taIds = taskAttemptMap.get(nodeId);
- if (taIds == null) {
- taIds = new ArrayList<TaskAttemptId>();
- taskAttemptMap.put(nodeId, taIds);
+ taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
+ List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId);
+ if (tads == null) {
+ tads = new ArrayList<TaskAttemptDescriptor>();
+ taskAttemptMap.put(nodeId, tads);
}
- taIds.add(taskAttempt.getTaskAttemptId());
+ ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
+ tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
+ .getInputPartitionCounts(), apd.getOutputPartitionCounts()));
}
- TaskClusterAttempt tcAttempt = new TaskClusterAttempt(taskAttempts);
tcAttempt.initializePendingTaskCounter();
tcAttempts.add(tcAttempt);
-
+ tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
+ inProgressTaskClusters.add(tc);
}
@Override
@@ -138,6 +157,7 @@
ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
if (tcAttempt.decrementPendingTasksCounter() == 0) {
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+ inProgressTaskClusters.remove(tc);
startRunnableTaskClusters();
}
} else {
@@ -151,9 +171,17 @@
private void startRunnableTaskClusters() throws HyracksException {
TaskCluster[] taskClusters = ac.getTaskClusters();
- Map<String, List<TaskAttemptId>> taskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
+ Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
for (TaskCluster tc : taskClusters) {
Set<TaskCluster> dependencies = tc.getDependencies();
+ List<TaskClusterAttempt> attempts = tc.getAttempts();
+ if (!attempts.isEmpty()) {
+ TaskClusterAttempt lastAttempt = attempts.get(attempts.size() - 1);
+ if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+ || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+ continue;
+ }
+ }
boolean runnable = true;
for (TaskCluster depTC : dependencies) {
List<TaskClusterAttempt> tcAttempts = depTC.getAttempts();
@@ -168,30 +196,45 @@
}
}
if (runnable) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+ LOGGER.info("Attempts so far:" + attempts.size());
+ for (TaskClusterAttempt tcAttempt : attempts) {
+ LOGGER.info("Status: " + tcAttempt.getStatus());
+ }
+ }
assignTaskLocations(tc, taskAttemptMap);
}
}
+ if (taskAttemptMap.isEmpty()) {
+ if (inProgressTaskClusters.isEmpty()) {
+ ac.notifyActivityClusterComplete();
+ }
+ return;
+ }
+
startTasks(taskAttemptMap);
}
- private void startTasks(Map<String, List<TaskAttemptId>> taskAttemptMap) {
+ private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) {
Executor executor = ccs.getExecutor();
JobRun jobRun = ac.getJobRun();
final UUID jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
- for (Map.Entry<String, List<TaskAttemptId>> e : taskAttemptMap.entrySet()) {
+ for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
String nodeId = e.getKey();
- final List<TaskAttemptId> taskIds = e.getValue();
+ final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
final NodeControllerState node = ccs.getNodeMap().get(nodeId);
if (node != null) {
+ jobRun.getParticipatingNodeIds().add(nodeId);
executor.execute(new Runnable() {
@Override
public void run() {
try {
node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskIds, null);
+ taskDescriptors);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
@@ -203,6 +246,41 @@
}
}
+ private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+ Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
+ for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
+ if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
+ ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+ List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
+ if (abortTaskAttempts == null) {
+ abortTaskAttempts = new ArrayList<TaskAttemptId>();
+ abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
+ }
+ abortTaskAttempts.add(ta2.getTaskAttemptId());
+ }
+ }
+ JobRun jobRun = ac.getJobRun();
+ final UUID jobId = jobRun.getJobId();
+ for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
+ final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
+ final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+ if (node != null) {
+ LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ }
+
@Override
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
TaskAttemptId taId = ta.getTaskAttemptId();
@@ -212,43 +290,11 @@
if (taId.getAttempt() == lastAttempt) {
TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
TaskAttempt.TaskStatus taStatus = ta.getStatus();
- Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
- for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
- if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING
- || ta2.getStatus() == TaskAttempt.TaskStatus.INITIALIZED) {
- ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
- List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
- if (abortTaskAttempts == null) {
- abortTaskAttempts = new ArrayList<TaskAttemptId>();
- abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
- }
- abortTaskAttempts.add(ta2.getTaskAttemptId());
- }
- }
- JobRun jobRun = ac.getJobRun();
- final UUID jobId = jobRun.getJobId();
- for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
- final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
- final List<TaskAttemptId> abortTaskAttempts = e.getValue();
- if (node != null) {
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- node.getNodeController().abortTasks(jobId, abortTaskAttempts);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
+ abortTaskCluster(tcAttempt);
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- Map<String, List<TaskAttemptId>> taskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
- assignTaskLocations(tc, taskAttemptMap);
- startTasks(taskAttemptMap);
+ ac.notifyTaskClusterFailure(tcAttempt, exception);
} else {
LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
}
@@ -256,4 +302,32 @@
LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
}
}
+
+ @Override
+ public void abort() {
+ TaskCluster[] taskClusters = ac.getTaskClusters();
+ for (TaskCluster tc : taskClusters) {
+ List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+ if (!tcAttempts.isEmpty()) {
+ TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
+ if (tcAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+ abortTaskCluster(tcAttempt);
+ tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
+ TaskCluster tc = tcAttempt.getTaskCluster();
+ if (tcAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
+ abort();
+ ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, exception);
+ return;
+ }
+ Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
+ assignTaskLocations(tc, taskAttemptMap);
+ startTasks(taskAttemptMap);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 9e70b42..b20c467 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc.scheduler;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,7 +35,6 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -47,11 +47,11 @@
import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.TaskState;
import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
public class DefaultJobRunStateMachine implements IJobRunStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -60,6 +60,8 @@
private final JobRun jobRun;
+ private final Map<OperatorDescriptorId, String> operatorLocationAssignmentMap;
+
private final Set<ActivityCluster> completedClusters;
private final Set<ActivityCluster> inProgressClusters;
@@ -71,10 +73,15 @@
public DefaultJobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
this.ccs = ccs;
this.jobRun = jobRun;
+ this.operatorLocationAssignmentMap = new HashMap<OperatorDescriptorId, String>();
completedClusters = new HashSet<ActivityCluster>();
inProgressClusters = new HashSet<ActivityCluster>();
}
+ public Map<OperatorDescriptorId, String> getOperatorLocationAssignmentMap() {
+ return operatorLocationAssignmentMap;
+ }
+
public PartitionConstraintSolver getSolver() {
return solver;
}
@@ -161,6 +168,7 @@
s.addDependent(bs);
}
}
+ jobRun.getActivityClusterMap().putAll(stageMap);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
for (ActivityCluster s : stages) {
@@ -193,7 +201,8 @@
}
private void findRunnableClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
- if (completedClusters.contains(candidate) || frontier.contains(candidate)) {
+ if (completedClusters.contains(candidate) || frontier.contains(candidate)
+ || inProgressClusters.contains(candidate)) {
return;
}
boolean runnable = true;
@@ -241,26 +250,32 @@
solver.addConstraints(contributedConstraints);
rootActivityCluster = inferStages(jag);
- Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
- findRunnableClusters(runnableClusters);
- if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
- ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
- return;
- }
- for (ActivityCluster ac : runnableClusters) {
- inProgressClusters.add(ac);
- buildTaskClusters(ac);
- IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
- ac.setStateMachine(acsm);
- acsm.schedule();
- }
+ startRunnableActivityClusters();
} catch (Exception e) {
+ e.printStackTrace();
ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
throw new HyracksException(e);
}
}
- private Map<ActivityNodeId, Integer> computePartitionCounts(ActivityCluster ac) throws HyracksException {
+ private void startRunnableActivityClusters() throws HyracksException {
+ Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
+ findRunnableClusters(runnableClusters);
+ if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
+ ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+ return;
+ }
+ for (ActivityCluster ac : runnableClusters) {
+ inProgressClusters.add(ac);
+ buildTaskClusters(ac);
+ IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
+ ac.setStateMachine(acsm);
+ acsm.schedule();
+ }
+ }
+
+ private Map<ActivityNodeId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+ throws HyracksException {
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
for (ActivityNodeId anId : ac.getActivities()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
@@ -282,22 +297,43 @@
}
nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
}
- Map<ActivityNodeId, Integer> activityPartsMap = new HashMap<ActivityNodeId, Integer>();
+ Map<ActivityNodeId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityNodeId, ActivityPartitionDetails>();
for (ActivityNodeId anId : ac.getActivities()) {
- activityPartsMap.put(anId, nPartMap.get(anId.getOperatorDescriptorId()));
+ int nParts = nPartMap.get(anId.getOperatorDescriptorId());
+ int[] nInputPartitions = null;
+ List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+ if (inputs != null) {
+ nInputPartitions = new int[inputs.size()];
+ for (int i = 0; i < nInputPartitions.length; ++i) {
+ nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+ .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ }
+ }
+ int[] nOutputPartitions = null;
+ List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
+ anId);
+ if (outputs != null) {
+ nOutputPartitions = new int[outputs.size()];
+ for (int i = 0; i < nOutputPartitions.length; ++i) {
+ nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+ .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ }
+ }
+ ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
+ activityPartsMap.put(anId, apd);
}
return activityPartsMap;
}
private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
- Map<ActivityNodeId, Integer> pcMap = computePartitionCounts(ac);
- Map<ActivityNodeId, TaskState[]> taskStateMap = ac.getTaskStateMap();
+ Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+ Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
for (ActivityNodeId anId : ac.getActivities()) {
- int nParts = pcMap.get(anId);
- TaskState[] taskStates = new TaskState[nParts];
- for (int i = 0; i < nParts; ++i) {
- taskStates[i] = new TaskState(new TaskId(anId, i));
+ ActivityPartitionDetails apd = pcMap.get(anId);
+ Task[] taskStates = new Task[apd.getPartitionCount()];
+ for (int i = 0; i < taskStates.length; ++i) {
+ taskStates[i] = new Task(new TaskId(anId, i), apd);
}
taskStateMap.put(anId, taskStates);
}
@@ -308,8 +344,8 @@
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
for (ActivityNodeId anId : activities) {
- TaskState[] taskStates = taskStateMap.get(anId);
- for (TaskState ts : taskStates) {
+ Task[] taskStates = taskStateMap.get(anId);
+ for (Task ts : taskStates) {
Set<TaskId> cluster = new HashSet<TaskId>();
cluster.add(ts.getTaskId());
taskClusterMap.put(ts.getTaskId(), cluster);
@@ -320,15 +356,18 @@
JobActivityGraph jag = jobRun.getJobActivityGraph();
BitSet targetBitmap = new BitSet();
for (ActivityNodeId ac1 : activities) {
- TaskState[] ac1TaskStates = taskStateMap.get(ac1);
+ Task[] ac1TaskStates = taskStateMap.get(ac1);
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
+ if (cPolicy == null) {
+ cPolicy = new PipelinedConnectorPolicy();
+ }
ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
- TaskState[] ac2TaskStates = taskStateMap.get(ac2);
+ Task[] ac2TaskStates = taskStateMap.get(ac2);
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -339,7 +378,7 @@
connectionInfo.put(ac1TaskStates[i].getTaskId(), cInfoList);
}
Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j)) {
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
cInfoList.add(new Pair<TaskId, IConnectorPolicy>(ac2TaskStates[j].getTaskId(), cPolicy));
if (cPolicy.requiresProducerConsumerCoscheduling()) {
cluster.add(ac2TaskStates[j].getTaskId());
@@ -378,11 +417,11 @@
Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
for (Set<TaskId> cluster : clusters) {
- Set<TaskState> taskStates = new HashSet<TaskState>();
+ Set<Task> taskStates = new HashSet<Task>();
for (TaskId tid : cluster) {
taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
}
- TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new TaskState[taskStates.size()]));
+ TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
tcSet.add(tc);
for (TaskId tid : cluster) {
taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
@@ -391,12 +430,12 @@
ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
for (TaskCluster tc : tcSet) {
- for (TaskState ts : tc.getTasks()) {
+ for (Task ts : tc.getTasks()) {
TaskId tid = ts.getTaskId();
List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(tid);
if (cInfoList != null) {
for (Pair<TaskId, IConnectorPolicy> p : cInfoList) {
- TaskState targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+ Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
targetTC.getDependencies().add(tc);
@@ -411,6 +450,14 @@
computeBlockerClosure(tcSet);
computeDependencyClosure(tcSet);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Plan for " + ac);
+ LOGGER.info("Built " + tcSet.size() + " Task Clusters");
+ for (TaskCluster tc : tcSet) {
+ LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ }
+ }
}
private void computeDependencyClosure(Set<TaskCluster> tcSet) {
@@ -454,4 +501,23 @@
}
}
}
+
+ @Override
+ public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
+ for (ActivityCluster ac2 : inProgressClusters) {
+ abortActivityCluster(ac2);
+ }
+ jobRun.setStatus(JobStatus.FAILURE, exception);
+ }
+
+ private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
+ ac.getStateMachine().abort();
+ }
+
+ @Override
+ public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
+ completedClusters.add(ac);
+ inProgressClusters.remove(ac);
+ startRunnableActivityClusters();
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
index 2c4a4ae..02adfa5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
@@ -16,11 +16,16 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
public interface IActivityClusterStateMachine {
public void schedule() throws HyracksException;
+ public void abort() throws HyracksException;
+
public void notifyTaskComplete(TaskAttempt ta) throws HyracksException;
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
+
+ public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
index fcf0a79..97ca9fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
@@ -15,7 +15,12 @@
package edu.uci.ics.hyracks.control.cc.scheduler;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
public interface IJobRunStateMachine {
public void schedule() throws HyracksException;
+
+ public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
+
+ public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
index 1a26e60..a0510f9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/AdminConsoleHandler.java
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
public class AdminConsoleHandler extends AbstractHandler {
private ClusterControllerService ccs;
@@ -51,7 +51,7 @@
writer.println("<h2>Node Controllers</h2>");
writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
try {
- ccs.getJobQueue().scheduleAndSync(new SynchronizableRunnable() {
+ ccs.getJobQueue().scheduleAndSync(new SynchronizableEvent() {
@Override
protected void doRun() throws Exception {
for (Map.Entry<String, NodeControllerState> e : ccs.getNodeMap().entrySet()) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
index 65c156c..e047212 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -28,7 +28,7 @@
import org.eclipse.jetty.server.handler.AbstractHandler;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
public class ApplicationInstallationHandler extends AbstractHandler {
@@ -54,7 +54,7 @@
}
final String appName = parts[0];
if (HttpMethods.PUT.equals(request.getMethod())) {
- class OutputStreamGetter extends SynchronizableRunnable {
+ class OutputStreamGetter extends SynchronizableEvent {
private OutputStream os;
@Override
@@ -78,7 +78,7 @@
r.os.close();
}
} else if (HttpMethods.GET.equals(request.getMethod())) {
- class InputStreamGetter extends SynchronizableRunnable {
+ class InputStreamGetter extends SynchronizableEvent {
private InputStream is;
@Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index f0ceed6..4043baa 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -16,13 +16,12 @@
import java.rmi.Remote;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController extends Remote {
public String getId() throws Exception;
@@ -31,8 +30,8 @@
public NodeCapability getNodeCapability() throws Exception;
- public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptId> tasks,
- Map<OperatorDescriptorId, Integer> opNumPartitions) throws Exception;
+ public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors)
+ throws Exception;
public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
new file mode 100644
index 0000000..15ba75c
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class TaskAttemptDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final TaskAttemptId taId;
+
+ private final int nPartitions;
+
+ private final int[] nInputPartitions;
+
+ private final int[] nOutputPartitions;
+
+ public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+ this.taId = taId;
+ this.nPartitions = nPartitions;
+ this.nInputPartitions = nInputPartitions;
+ this.nOutputPartitions = nOutputPartitions;
+ }
+
+ public TaskAttemptId getTaskAttemptId() {
+ return taId;
+ }
+
+ public int getPartitionCount() {
+ return nPartitions;
+ }
+
+ public int[] getInputPartitionCounts() {
+ return nInputPartitions;
+ }
+
+ public int[] getOutputPartitionCounts() {
+ return nOutputPartitions;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6599eb0..072c2ec 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -90,13 +90,13 @@
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
- opEnvMap.put(partition, new OperatorEnvironmentImpl());
+ opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
}
return opEnvMap.get(partition);
}
public void addTask(Task task) {
- taskMap.put(task.getTaskId(), task);
+ taskMap.put(task.getTaskAttemptId(), task);
}
public Map<TaskAttemptId, Task> getTaskMap() {
@@ -104,9 +104,11 @@
}
private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+ private final String nodeId;
private final Map<String, Object> map;
- public OperatorEnvironmentImpl() {
+ public OperatorEnvironmentImpl(String nodeId) {
+ this.nodeId = nodeId;
map = new HashMap<String, Object>();
}
@@ -119,6 +121,10 @@
public void set(String name, Object value) {
map.put(name, value);
}
+
+ public String toString() {
+ return super.toString() + "@" + nodeId;
+ }
}
public Executor getExecutor() {
@@ -127,14 +133,14 @@
public synchronized void notifyTaskComplete(Task task) throws Exception {
taskMap.remove(task);
- TaskProfile taskProfile = new TaskProfile(task.getTaskId());
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
task.dumpProfile(taskProfile);
- nodeController.notifyTaskComplete(jobId, task.getTaskId(), taskProfile);
+ nodeController.notifyTaskComplete(jobId, task.getTaskAttemptId(), taskProfile);
}
- public synchronized void notifyStageletFailed(Task task, Exception exception) throws Exception {
+ public synchronized void notifyTaskFailed(Task task, Exception exception) {
taskMap.remove(task);
- nodeController.notifyTaskFailed(jobId, task.getTaskId(), exception);
+ nodeController.notifyTaskFailed(jobId, task.getTaskAttemptId(), exception);
}
public NodeControllerService getNodeController() {
@@ -147,9 +153,9 @@
counters.put(e.getKey(), e.getValue().get());
}
for (Task task : taskMap.values()) {
- TaskProfile taskProfile = new TaskProfile(task.getTaskId());
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
task.dumpProfile(taskProfile);
- jProfile.getTaskProfiles().put(task.getTaskId(), taskProfile);
+ jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index bdd30a9..d05db7d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -58,6 +58,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -73,6 +74,7 @@
import edu.uci.ics.hyracks.control.common.base.NodeCapability;
import edu.uci.ics.hyracks.control.common.base.NodeParameters;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -81,6 +83,7 @@
import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -215,8 +218,8 @@
return InetAddress.getByAddress(ipBytes);
}
- public void startTasks(String appName, UUID jobId, byte[] jagBytes, List<TaskAttemptId> tasks,
- Map<OperatorDescriptorId, Integer> opNumPartitions) throws Exception {
+ public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
+ List<TaskAttemptDescriptor> taskDescriptors) throws Exception {
try {
NCApplicationContext appCtx = applications.get(appName);
final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
@@ -235,56 +238,53 @@
final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
- for (TaskAttemptId tid : tasks) {
- IActivityNode han = plan.getActivityNodeMap().get(tid.getTaskId().getActivityId());
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Initializing " + tid + " -> " + han);
+ for (TaskAttemptDescriptor td : taskDescriptors) {
+ TaskAttemptId taId = td.getTaskAttemptId();
+ TaskId tid = taId.getTaskId();
+ IActivityNode han = plan.getActivityNodeMap().get(tid.getActivityId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Initializing " + taId + " -> " + han);
}
IOperatorDescriptor op = han.getOwner();
- int partition = tid.getTaskId().getPartition();
- Task task = new Task(joblet, tid);
+ final int partition = tid.getPartition();
+ Task task = new Task(joblet, taId, han.getClass().getName());
IOperatorNodePushable operator = han.createPushRuntime(task, joblet.getEnvironment(op, partition), rdp,
- partition, opNumPartitions.get(op.getOperatorId()));
+ partition, td.getPartitionCount());
IPartitionCollector collector = null;
- List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getTaskId()
- .getActivityId());
+ List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
if (i >= 1) {
throw new HyracksException("Multiple inputs to an activity not currently supported");
}
- if (!inputs.isEmpty()) {
- IConnectorDescriptor conn = inputs.get(0);
- OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
- OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
- RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- collector = conn.createPartitionCollector(task, recordDesc, partition,
- opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
+ IConnectorDescriptor conn = inputs.get(i);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
+ RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+ collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
}
}
- List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getTaskId()
- .getActivityId());
+ List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
if (outputs != null) {
for (int i = 0; i < outputs.size(); ++i) {
- IConnectorDescriptor conn = outputs.get(i);
- OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
- OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
+ final IConnectorDescriptor conn = outputs.get(i);
RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
IPartitionWriterFactory pwFactory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return null;
+ return new PipelinedPartition(partitionManager, new PartitionId(jobId,
+ conn.getConnectorId(), partition, receiverIndex));
}
};
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+ }
IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
- opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
@@ -334,12 +334,11 @@
}
}
- public void notifyTaskFailed(UUID jobId, TaskAttemptId taskId, Exception exception) throws Exception {
+ public void notifyTaskFailed(UUID jobId, TaskAttemptId taskId, Exception exception) {
try {
ccs.notifyTaskFailure(jobId, taskId, id, exception);
} catch (Exception e) {
e.printStackTrace();
- throw e;
}
}
@@ -467,8 +466,10 @@
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
Joblet ji = jobletMap.get(pid.getJobId());
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
- new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
- ji.reportPartitionAvailability(channel);
+ if (ji != null) {
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
+ new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
+ ji.reportPartitionAvailability(channel);
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index a39d1b7..8f30e39 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -42,7 +42,9 @@
public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
private final Joblet joblet;
- private final TaskAttemptId taskId;
+ private final TaskAttemptId taskAttemptId;
+
+ private final String displayName;
private final IWorkspaceFileFactory fileFactory;
@@ -56,9 +58,10 @@
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId) {
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName) {
this.joblet = joblet;
- this.taskId = taskId;
+ this.taskAttemptId = taskId;
+ this.displayName = displayName;
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
@@ -109,8 +112,8 @@
}
@Override
- public TaskAttemptId getTaskId() {
- return taskId;
+ public TaskAttemptId getTaskAttemptId() {
+ return taskAttemptId;
}
@Override
@@ -142,12 +145,17 @@
public void abort() {
aborted = true;
- collector.abort();
+ if (collector != null) {
+ collector.abort();
+ }
}
@Override
public void run() {
+ Thread ct = Thread.currentThread();
+ String threadName = ct.getName();
try {
+ ct.setName(displayName + ": " + taskAttemptId);
operator.initialize();
try {
if (collector != null) {
@@ -185,8 +193,12 @@
} finally {
operator.deinitialize();
}
+ joblet.notifyTaskComplete(this);
} catch (Exception e) {
-
+ e.printStackTrace();
+ joblet.notifyTaskFailed(this, e);
+ } finally {
+ ct.setName(threadName);
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
index fbddabd..c3a4e20 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/ConnectionManager.java
@@ -18,7 +18,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SelectionKey;
@@ -26,8 +25,10 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,7 +43,7 @@
public class ConnectionManager {
private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
- private static final int INITIAL_MESSAGE_SIZE = 40;
+ static final int INITIAL_MESSAGE_SIZE = 40;
private final IHyracksRootContext ctx;
@@ -95,6 +96,7 @@
private final class ConnectionListenerThread extends Thread {
public ConnectionListenerThread() {
super("Hyracks NC Connection Listener");
+ setDaemon(true);
}
@Override
@@ -105,7 +107,9 @@
dataListener.addIncomingConnection(sc);
} catch (AsynchronousCloseException e) {
// do nothing
- e.printStackTrace();
+ if (!stopped) {
+ e.printStackTrace();
+ }
} catch (IOException e) {
e.printStackTrace();
}
@@ -117,19 +121,20 @@
private Selector selector;
private final List<SocketChannel> pendingIncomingConnections;
- private final List<SocketChannel> pendingNegotiations;
+ private final Set<SocketChannel> pendingNegotiations;
private final List<INetworkChannel> pendingOutgoingConnections;
private final List<INetworkChannel> pendingAbortConnections;
public DataListenerThread() {
super("Hyracks Data Listener Thread");
+ setDaemon(true);
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
pendingIncomingConnections = new ArrayList<SocketChannel>();
- pendingNegotiations = new ArrayList<SocketChannel>();
+ pendingNegotiations = new HashSet<SocketChannel>();
pendingOutgoingConnections = new ArrayList<INetworkChannel>();
pendingAbortConnections = new ArrayList<INetworkChannel>();
}
@@ -170,16 +175,12 @@
}
if (!pendingOutgoingConnections.isEmpty()) {
for (INetworkChannel nc : pendingOutgoingConnections) {
- SocketAddress rAddr = nc.getRemoteAddress();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
- int interestOps = SelectionKey.OP_READ;
- if (!sc.connect(rAddr)) {
- interestOps |= SelectionKey.OP_CONNECT;
- }
- SelectionKey scKey = sc.register(selector, interestOps);
+ SelectionKey scKey = sc.register(selector, 0);
scKey.attach(nc);
nc.setSelectionKey(scKey);
+ nc.notifyConnectionManagerRegistration();
}
pendingOutgoingConnections.clear();
}
@@ -207,6 +208,7 @@
buffer.flip();
if (buffer.remaining() >= INITIAL_MESSAGE_SIZE) {
PartitionId pid = readInitialMessage(buffer);
+ pendingNegotiations.remove(sc);
key.interestOps(0);
NetworkOutputChannel channel = new NetworkOutputChannel(ctx, 5);
channel.setSelectionKey(key);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
index 98215ee..61cd91f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/INetworkChannel.java
@@ -14,4 +14,6 @@
public SocketAddress getRemoteAddress();
public void abort();
+
+ public void notifyConnectionManagerRegistration() throws IOException;
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 24a6412..dcf530e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -21,11 +21,13 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -58,6 +60,8 @@
private Object attachment;
+ private ByteBuffer writeBuffer;
+
public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
this.connectionManager = connectionManager;
@@ -94,11 +98,16 @@
@Override
public synchronized void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
emptyQueue.add(buffer);
if (!eos && !aborted) {
int ops = key.interestOps();
if ((ops & SelectionKey.OP_READ) == 0) {
key.interestOps(ops | SelectionKey.OP_READ);
+ key.selector().wakeup();
+ if (currentBuffer == null) {
+ currentBuffer = emptyQueue.poll();
+ }
}
}
}
@@ -125,7 +134,17 @@
monitor.notifyEndOfStream(this);
return true;
}
- if (key.isReadable()) {
+ if (key.isConnectable()) {
+ if (socketChannel.finishConnect()) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+ prepareForWrite();
+ }
+ } else if (key.isWritable()) {
+ socketChannel.write(writeBuffer);
+ if (writeBuffer.remaining() == 0) {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ } else if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
}
@@ -144,6 +163,11 @@
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
}
+ if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
+ eos = true;
+ monitor.notifyEndOfStream(this);
+ return true;
+ }
fullQueue.add(currentBuffer);
currentBuffer = emptyQueue.poll();
if (currentBuffer == null && key.isValid()) {
@@ -158,6 +182,26 @@
return false;
}
+ private void prepareForConnect() {
+ key.interestOps(SelectionKey.OP_CONNECT);
+ }
+
+ private void prepareForWrite() {
+ writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
+ writeUUID(writeBuffer, partitionId.getJobId());
+ writeUUID(writeBuffer, partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+
+ key.interestOps(SelectionKey.OP_WRITE);
+ }
+
+ private void writeUUID(ByteBuffer buffer, UUID uuid) {
+ buffer.putLong(uuid.getMostSignificantBits());
+ buffer.putLong(uuid.getLeastSignificantBits());
+ }
+
@Override
public void setSelectionKey(SelectionKey key) {
this.key = key;
@@ -185,4 +229,13 @@
public boolean aborted() {
return aborted;
}
+
+ @Override
+ public void notifyConnectionManagerRegistration() throws IOException {
+ if (socketChannel.connect(remoteAddress)) {
+ prepareForWrite();
+ } else {
+ prepareForConnect();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index eb1ec89..74542b3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -149,10 +149,8 @@
synchronized (this) {
fullQueue.add(destBuffer);
}
- int interestOps = key.interestOps();
- if ((interestOps & SelectionKey.OP_WRITE) == 0) {
- key.interestOps(interestOps | SelectionKey.OP_WRITE);
- }
+ key.interestOps(SelectionKey.OP_WRITE);
+ key.selector().wakeup();
}
@Override
@@ -163,5 +161,11 @@
@Override
public synchronized void close() throws HyracksDataException {
eos = true;
+ key.interestOps(SelectionKey.OP_WRITE);
+ key.selector().wakeup();
+ }
+
+ @Override
+ public void notifyConnectionManagerRegistration() throws IOException {
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java
deleted file mode 100644
index 0977002..0000000
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/SortMergePartitionCollector.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.common.comm;
-
-import java.util.Collection;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-
-public class SortMergePartitionCollector extends AbstractPartitionCollector {
- private final FrameTuplePairComparator tpc;
-
- private final FrameTupleAppender appender;
-
- private final RecordDescriptor recordDescriptor;
-
- private final int maxFramesLimit;
-
- private IInputChannel[] channels;
-
- public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
- int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, int maxFramesLimit,
- int nSenders) {
- super(ctx, connectorId, receiverIndex);
- tpc = new FrameTuplePairComparator(sortFields, sortFields, comparators);
- appender = new FrameTupleAppender(ctx.getFrameSize());
- this.recordDescriptor = recordDescriptor;
- this.maxFramesLimit = maxFramesLimit;
- channels = new IInputChannel[nSenders];
- }
-
- @Override
- public void open() throws HyracksException {
- }
-
- @Override
- public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
-
- }
-
- @Override
- public IFrameReader getReader() throws HyracksException {
- return null;
- }
-
- @Override
- public void close() throws HyracksException {
-
- }
-
- @Override
- public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
- return null;
- }
-
- @Override
- public void abort() {
-
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
similarity index 87%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
index 4326a9f..fbc6fc4 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/AbstractPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/AbstractPartitionCollector.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.common.comm;
+package edu.uci.ics.hyracks.dataflow.std.collectors;
import java.util.UUID;
@@ -21,21 +21,21 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
public abstract class AbstractPartitionCollector implements IPartitionCollector {
- protected final IHyracksTaskContext stageletContext;
+ protected final IHyracksTaskContext ctx;
protected final ConnectorDescriptorId connectorId;
protected final int receiverIndex;
public AbstractPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex) {
- this.stageletContext = ctx;
+ this.ctx = ctx;
this.connectorId = connectorId;
this.receiverIndex = receiverIndex;
}
@Override
public UUID getJobId() {
- return stageletContext.getJobletContext().getJobId();
+ return ctx.getJobletContext().getJobId();
}
@Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
similarity index 81%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index fb6a883..b640142 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.dataflow.common.comm;
+package edu.uci.ics.hyracks.dataflow.std.collectors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -35,31 +35,33 @@
private final BitSet expectedPartitions;
- private IInputChannel[] channels;
+ private final int nSenderPartitions;
- private BitSet frameAvailability;
+ private final IInputChannel[] channels;
- private int[] availableFrameCounts;
+ private final BitSet frameAvailability;
- private BitSet eosSenders;
+ private final int[] availableFrameCounts;
+
+ private final BitSet eosSenders;
private BitSet closedSenders;
private int lastReadSender;
public NonDeterministicPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId,
- int receiverIndex, BitSet expectedPartitions) {
+ int receiverIndex, int nSenderPartitions, BitSet expectedPartitions) {
super(ctx, connectorId, receiverIndex);
this.expectedPartitions = expectedPartitions;
- int nSenders = expectedPartitions.size();
+ this.nSenderPartitions = nSenderPartitions;
reader = new FrameReader();
- channels = new IInputChannel[nSenders];
- eosSenders = new BitSet(nSenders);
- closedSenders = new BitSet(nSenders);
+ channels = new IInputChannel[nSenderPartitions];
+ eosSenders = new BitSet(nSenderPartitions);
+ closedSenders = new BitSet(nSenderPartitions);
closedSenders.or(expectedPartitions);
- closedSenders.flip(0, nSenders);
- frameAvailability = new BitSet(nSenders);
- availableFrameCounts = new int[nSenders];
+ closedSenders.flip(0, nSenderPartitions);
+ frameAvailability = new BitSet(nSenderPartitions);
+ availableFrameCounts = new int[nSenderPartitions];
}
@Override
@@ -68,13 +70,15 @@
}
@Override
- public synchronized void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
for (PartitionChannel pc : partitions) {
PartitionId pid = pc.getPartitionId();
IInputChannel channel = pc.getInputChannel();
channel.setAttachment(pid);
channel.registerMonitor(reader);
- channels[pid.getSenderIndex()] = channel;
+ synchronized (this) {
+ channels[pid.getSenderIndex()] = channel;
+ }
channel.open();
}
}
@@ -99,7 +103,7 @@
while (true) {
switch (lastReadSender) {
default:
- lastReadSender = frameAvailability.nextSetBit(lastReadSender);
+ lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
if (lastReadSender >= 0) {
break;
}
@@ -121,13 +125,14 @@
eosSenders.clear(i);
closedSenders.set(i);
}
- if (closedSenders.nextClearBit(0) < 0) {
+ int nextClosedBitIndex = closedSenders.nextClearBit(0);
+ if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
return false;
}
try {
NonDeterministicPartitionCollector.this.wait();
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw new HyracksDataException(e);
}
}
}
@@ -136,9 +141,11 @@
@Override
public void close() throws HyracksDataException {
synchronized (NonDeterministicPartitionCollector.this) {
- for (int i = closedSenders.nextClearBit(0); i >= 0; i = closedSenders.nextClearBit(i)) {
+ for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
+ .nextClearBit(i + 1)) {
if (channels[i] != null) {
channels[i].close();
+ channels[i] = null;
}
}
}
@@ -169,7 +176,7 @@
@Override
public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
- for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i)) {
+ for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
}
return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
new file mode 100644
index 0000000..70355c7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SortMergePartitionCollector extends AbstractPartitionCollector {
+ private final int[] sortFields;
+
+ private final IBinaryComparator[] comparators;
+
+ private final RecordDescriptor recordDescriptor;
+
+ private final int maxConcurrentMerges;
+
+ private final IInputChannel[] channels;
+
+ private final int nSenders;
+
+ private final boolean stable;
+
+ private final FrameReader frameReader;
+
+ private final PartitionBatchManager pbm;
+
+ public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
+ int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor,
+ int maxConcurrentMerges, int nSenders, boolean stable) {
+ super(ctx, connectorId, receiverIndex);
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDescriptor = recordDescriptor;
+ this.maxConcurrentMerges = maxConcurrentMerges;
+ channels = new IInputChannel[nSenders];
+ this.nSenders = nSenders;
+ this.stable = stable;
+ this.frameReader = new FrameReader();
+ pbm = new NonDeterministicPartitionBatchManager();
+ }
+
+ @Override
+ public void open() throws HyracksException {
+ }
+
+ @Override
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ for (PartitionChannel pc : partitions) {
+ PartitionId pid = pc.getPartitionId();
+ IInputChannel channel = pc.getInputChannel();
+ InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+ channel.registerMonitor(channelReader);
+ channel.setAttachment(channelReader);
+ synchronized (this) {
+ channels[pid.getSenderIndex()] = channel;
+ }
+ pbm.addPartition(pid.getSenderIndex());
+ channel.open();
+ }
+ }
+
+ @Override
+ public IFrameReader getReader() throws HyracksException {
+ return frameReader;
+ }
+
+ @Override
+ public void close() throws HyracksException {
+
+ }
+
+ @Override
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+ Collection<PartitionId> requiredPartitionIds = new ArrayList<PartitionId>();
+ for (int i = 0; i < nSenders; ++i) {
+ requiredPartitionIds.add(new PartitionId(getJobId(), getConnectorId(), i, receiverIndex));
+ }
+ return requiredPartitionIds;
+ }
+
+ @Override
+ public void abort() {
+
+ }
+
+ private abstract class PartitionBatchManager {
+ protected abstract void addPartition(int index);
+
+ protected abstract void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
+ }
+
+ private class NonDeterministicPartitionBatchManager extends PartitionBatchManager {
+ private List<IFrameReader> partitions;
+
+ private List<IFrameReader> batch;
+
+ private int requiredSize;
+
+ public NonDeterministicPartitionBatchManager() {
+ partitions = new ArrayList<IFrameReader>();
+ }
+
+ @Override
+ protected void addPartition(int index) {
+ synchronized (SortMergePartitionCollector.this) {
+ if (batch != null && batch.size() < requiredSize) {
+ batch.add((IFrameReader) channels[index].getAttachment());
+ if (batch.size() == requiredSize) {
+ SortMergePartitionCollector.this.notifyAll();
+ }
+ } else {
+ partitions.add((IFrameReader) channels[index].getAttachment());
+ }
+ }
+ }
+
+ @Override
+ protected void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
+ synchronized (SortMergePartitionCollector.this) {
+ if (partitions.size() <= size) {
+ batch.addAll(partitions);
+ partitions.clear();
+ } else if (partitions.size() > size) {
+ List<IFrameReader> sublist = partitions.subList(0, size);
+ batch.addAll(sublist);
+ sublist.clear();
+ }
+ if (batch.size() == size) {
+ return;
+ }
+ this.batch = batch;
+ this.requiredSize = size;
+ while (batch.size() < size) {
+ try {
+ SortMergePartitionCollector.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ this.batch = null;
+ }
+ }
+ }
+
+ private static class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
+ private final IInputChannel channel;
+
+ private int availableFrames;
+
+ private boolean eos;
+
+ public InputChannelFrameReader(IInputChannel channel) {
+ this.channel = channel;
+ availableFrames = 0;
+ eos = false;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public synchronized boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ while (!eos && availableFrames <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (eos) {
+ return false;
+ }
+ ByteBuffer srcBuffer = channel.getNextBuffer();
+ --availableFrames;
+ FrameUtils.copy(srcBuffer, buffer);
+ channel.recycleBuffer(srcBuffer);
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ availableFrames += nFrames;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos = true;
+ notifyAll();
+ }
+ }
+
+ private class FrameReader implements IFrameReader {
+ private RunMergingFrameReader merger;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (maxConcurrentMerges >= nSenders) {
+ List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < nSenders; ++i) {
+ inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+ }
+ List<IFrameReader> batch = new ArrayList<IFrameReader>();
+ pbm.getNextBatch(batch, nSenders);
+ merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames,
+ sortFields, comparators, recordDescriptor);
+ } else {
+ // multi level merge.
+ throw new HyracksDataException("Not yet supported");
+ }
+ merger.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ buffer.position(buffer.capacity());
+ buffer.limit(buffer.capacity());
+ return merger.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ merger.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 6037393..12935af 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -24,8 +24,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -50,6 +50,7 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+ return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ expectedPartitions);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index a8bcfa5..54979f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -24,8 +24,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.SortMergePartitionCollector;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergePartitionCollector;
public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -33,13 +33,20 @@
private final ITuplePartitionComputerFactory tpcf;
private final int[] sortFields;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean stable;
public MToNPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+ this(spec, tpcf, sortFields, comparatorFactories, false);
+ }
+
+ public MToNPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
+ int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, boolean stable) {
super(spec);
this.tpcf = tpcf;
this.sortFields = sortFields;
this.comparatorFactories = comparatorFactories;
+ this.stable = stable;
}
@Override
@@ -59,6 +66,6 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
return new SortMergePartitionCollector(ctx, getConnectorId(), index, sortFields, comparators, recordDesc,
- nProducerPartitions, nProducerPartitions);
+ nProducerPartitions, nProducerPartitions, stable);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 957734c..5297a99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -24,8 +24,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
public MToNReplicatingConnectorDescriptor(JobSpecification spec) {
@@ -82,6 +82,7 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+ return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ expectedPartitions);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a15a64f..556466f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.NonDeterministicPartitionCollector;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -50,7 +50,8 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(index);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, expectedPartitions);
+ return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ expectedPartitions);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 6f46932..a2f366b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -110,6 +110,7 @@
@Override
public void close() throws HyracksDataException {
+ System.err.println("close%%%%%%%%%%%%%%%%%%%%%%%%%" + joiner + " " + env);
env.set(JOINER, joiner);
}
@@ -138,6 +139,7 @@
@Override
public void open() throws HyracksDataException {
joiner = (InMemoryHashJoin) env.get(JOINER);
+ System.err.println("%%%%%%%%%%%%%%%%%%%%%%%%%" + joiner + " " + env);
writer.open();
}
@@ -148,6 +150,7 @@
@Override
public void close() throws HyracksDataException {
+ System.err.println("^^^^^^^^^^^^^^^^^^^^^^^^" + joiner);
joiner.closeJoin(writer);
writer.close();
env.set(JOINER, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index cf46c0c..34a8e5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -46,6 +46,7 @@
appender.reset(writeBuffer, true);
if (fieldSlots != null && tupleData != null && tupleSize > 0)
appender.append(fieldSlots, tupleData, 0, tupleSize);
+ writer.open();
FrameUtils.flushFrame(writeBuffer, writer);
writer.close();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index c860b2b..6871452 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -17,10 +17,12 @@
import java.nio.ByteBuffer;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -28,7 +30,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -131,10 +132,14 @@
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- List<RunFileReader> runs = (List<RunFileReader>) env.get(RUNS);
+ List<IFrameReader> runs = (List<IFrameReader>) env.get(RUNS);
FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
- comparatorFactories, recordDescriptors[0], framesLimit, writer);
+ comparators, recordDescriptors[0], framesLimit, writer);
merger.process();
env.set(FRAMESORTER, null);
env.set(RUNS, null);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index 742a5f9..eee6f49 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -25,13 +26,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
public class ExternalSortRunGenerator implements IFrameWriter {
private final IHyracksTaskContext ctx;
private final FrameSorter frameSorter;
- private final List<RunFileReader> runs;
+ private final List<IFrameReader> runs;
private final int maxSortFrames;
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
@@ -39,7 +39,7 @@
RecordDescriptor recordDesc, int framesLimit) {
this.ctx = ctx;
frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
- runs = new LinkedList<RunFileReader>();
+ runs = new LinkedList<IFrameReader>();
maxSortFrames = framesLimit - 1;
}
@@ -91,7 +91,7 @@
return frameSorter;
}
- public List<RunFileReader> getRuns() {
+ public List<IFrameReader> getRuns() {
return runs;
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 096adaa..8b8429c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -16,29 +16,24 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class ExternalSortRunMerger {
private final IHyracksTaskContext ctx;
private final FrameSorter frameSorter;
- private final List<RunFileReader> runs;
+ private final List<IFrameReader> runs;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDesc;
@@ -48,26 +43,21 @@
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
- public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<RunFileReader> runs,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc,
- int framesLimit, IFrameWriter writer) {
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
+ int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
+ IFrameWriter writer) {
this.ctx = ctx;
this.frameSorter = frameSorter;
this.runs = runs;
this.sortFields = sortFields;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
+ this.comparators = comparators;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
}
- public void process(boolean doFinalPass) throws HyracksDataException {
- if (doFinalPass) {
- writer.open();
- }
+ public void process() throws HyracksDataException {
+ writer.open();
try {
if (runs.size() <= 0) {
if (frameSorter != null && frameSorter.getFrameCount() > 0) {
@@ -81,36 +71,25 @@
for (int i = 0; i < framesLimit - 1; ++i) {
inFrames.add(ctx.allocateFrame());
}
- int passCount = 0;
while (runs.size() > 0) {
- passCount++;
try {
- doPass(runs, passCount, doFinalPass);
+ doPass(runs);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
}
} finally {
- if (doFinalPass) {
- writer.close();
- }
+ writer.close();
}
}
- public void process() throws HyracksDataException {
- process(true);
- }
-
// creates a new run from runs that can fit in memory.
- private void doPass(List<RunFileReader> runs, int passCount, boolean doFinalPass) throws HyracksDataException {
+ private void doPass(List<IFrameReader> runs) throws HyracksDataException {
FileReference newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- if (!doFinalPass) {
- return;
- }
finalPass = true;
for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
inFrames.remove(i);
@@ -121,46 +100,19 @@
writer.open();
}
try {
- RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc,
- inFrames.size(), comparator);
- int[] tupleIndexes = new int[inFrames.size()];
+ IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runCursors[runIndex] = runs.get(runIndex);
- runCursors[runIndex].open();
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- } else {
- closeRun(runIndex, runCursors, tupleAccessors);
- }
+ runCursors[i] = runs.get(i);
}
-
- while (!topTuples.areRunsExhausted()) {
- ReferenceEntry top = topTuples.peek();
- int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- int tupleIndex = top.getTupleIndex();
-
- if (!outFrameAppender.append(fta, tupleIndex)) {
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+ comparators, recordDesc);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!outFrameAppender.append(fta, tupleIndex)) {
- throw new IllegalStateException();
- }
}
-
- ++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- }
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
+ } finally {
+ merger.close();
}
runs.subList(0, inFrames.size()).clear();
if (!finalPass) {
@@ -172,66 +124,4 @@
}
}
}
-
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- if (exists) {
- topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- if (runCursors[runIndex].nextFrame(buf)) {
- tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
-
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
-
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
- public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = tp1.getTupleIndex();
- int j2 = tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < sortFields.length; ++f) {
- int fIdx = sortFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 603b194..c583d04 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -136,7 +136,6 @@
FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
ByteBuffer outFrame = ctx.allocateFrame();
- writer.open();
appender.reset(outFrame, true);
int n = tPointers.length / 4;
for (int ptr = 0; ptr < n; ++ptr) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
new file mode 100644
index 0000000..ddd24d8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class RunMergingFrameReader implements IFrameReader {
+ private final IHyracksTaskContext ctx;
+
+ private final IFrameReader[] runCursors;
+
+ private final List<ByteBuffer> inFrames;
+
+ private final int[] sortFields;
+
+ private final IBinaryComparator[] comparators;
+
+ private final RecordDescriptor recordDesc;
+
+ private final FrameTupleAppender outFrameAppender;
+
+ private ReferencedPriorityQueue topTuples;
+
+ private int[] tupleIndexes;
+
+ private FrameTupleAccessor[] tupleAccessors;
+
+ public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
+ int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc) {
+ this.ctx = ctx;
+ this.runCursors = runCursors;
+ this.inFrames = inFrames;
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, inFrames.size(), comparator);
+ tupleIndexes = new int[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ tupleIndexes[i] = 0;
+ int runIndex = topTuples.peek().getRunid();
+ runCursors[runIndex].open();
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ } else {
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ outFrameAppender.reset(buffer, true);
+ while (!topTuples.areRunsExhausted()) {
+ ReferenceEntry top = topTuples.peek();
+ int runIndex = top.getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ int tupleIndex = top.getTupleIndex();
+
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ return true;
+ }
+
+ ++tupleIndexes[runIndex];
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ }
+
+ if (outFrameAppender.getTupleCount() > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < inFrames.size(); ++i) {
+ closeRun(i, runCursors, tupleAccessors);
+ }
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ if (exists) {
+ topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+ if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ return false;
+ } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+ ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+ if (runCursors[runIndex].nextFrame(buf)) {
+ tupleIndexes[runIndex] = 0;
+ return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ tupleAccessors[index] = null;
+ }
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+ public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ int j1 = tp1.getTupleIndex();
+ int j2 = tp2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < sortFields.length; ++f) {
+ int fIdx = sortFields[f];
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index b2b75b9..a84d793 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -42,7 +42,7 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.port = 39001;
- ccConfig.profileDumpPeriod = 1000;
+ ccConfig.profileDumpPeriod = 10000;
cc = new ClusterControllerService(ccConfig);
cc.start();
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index ec6da88..ade333e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -51,6 +51,7 @@
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+ writer.open();
try {
btreeOpHelper.init();
btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 819dea6..84be67a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -87,9 +87,10 @@
cursorFrame = opDesc.getLeafFactory().getFrame();
cursor = new RangeSearchCursor(cursorFrame);
+ writer.open();
try {
-
btreeOpHelper.init();
+
btree = btreeOpHelper.getBTree();
// construct range predicate
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index c6f6e5b..37e7bd6 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -62,6 +62,7 @@
builderDos = builder.getDataOutput();
appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(writeBuffer, true);
+ writer.open();
}
@Override
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index c2693ab..a9bba5a 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -279,6 +279,39 @@
}
}
+ private String dumpState() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("Buffer cache state\n");
+ buffer.append("Page Size: ").append(pageSize).append('\n');
+ buffer.append("Number of physical pages: ").append(numPages).append('\n');
+ buffer.append("Hash table size: ").append(pageMap.length).append('\n');
+ buffer.append("Page Map:\n");
+ int nCachedPages = 0;
+ for (int i = 0; i < pageMap.length; ++i) {
+ CacheBucket cb = pageMap[i];
+ cb.bucketLock.lock();
+ try {
+ CachedPage cp = cb.cachedPage;
+ if (cp != null) {
+ buffer.append(" ").append(i).append('\n');
+ while (cp != null) {
+ buffer.append(" ").append(cp.cpid).append(" -> [")
+ .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+ .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+ .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+ .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+ cp = cp.next;
+ ++nCachedPages;
+ }
+ }
+ } finally {
+ cb.bucketLock.unlock();
+ }
+ }
+ buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
+ return buffer.toString();
+ }
+
private void read(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
cPage.buffer.clear();
@@ -568,6 +601,7 @@
public void closeFile(int fileId) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Closing file: " + fileId + " in cache: " + this);
+ LOGGER.info(dumpState());
}
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 589352a..cbab080 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -85,7 +85,7 @@
}
@Override
- public TaskAttemptId getTaskId() {
+ public TaskAttemptId getTaskAttemptId() {
return taskId;
}
}
\ No newline at end of file