merge from master
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 06ef1097..addfb2e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -33,118 +33,136 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class AlgebricksMetaOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- // array of factories for building the local runtime pipeline
- private final AlgebricksPipeline pipeline;
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
- public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
- IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
- super(spec, inputArity, outputArity);
- if (outputArity == 1) {
- this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
- }
- this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
- }
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories,
+ RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories,
+ internalRecordDescriptors);
+ }
- public AlgebricksPipeline getPipeline() {
- return pipeline;
- }
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject json = super.toJSON();
- json.put("micro-operators", pipeline.getRuntimeFactories());
- return json;
- }
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Asterix { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- // sb.append(super.getInputArity());
- // sb.append(";");
- // sb.append(super.getOutputArity());
- // sb.append(";");
- return sb.toString();
- }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- if (inputArity == 0) {
- return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- } else {
- return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- }
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ }
+ }
- private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private IOperatorNodePushable createSourceInputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
- public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- startOfPipeline.open();
- startOfPipeline.close();
- }
- };
- }
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
- private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private IFrameWriter startOfPipeline;
+ private IFrameWriter startOfPipeline;
- @Override
- public void open() throws HyracksDataException {
- if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
- pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- startOfPipeline.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+ .getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this
+ .getActivityId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity,
+ pipelineInputRecordDescriptor,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- startOfPipeline.nextFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
- @Override
- public void close() throws HyracksDataException {
- startOfPipeline.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
- @Override
- public void fail() throws HyracksDataException {
- startOfPipeline.fail();
- }
- };
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ startOfPipeline.fail();
+ }
+ };
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 7a795da..015e3a9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -58,4 +58,5 @@
* @return The Cluster Controller Context.
*/
public ICCContext getCCContext();
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
index 51db13e..7225969 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
@@ -3,9 +3,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
+<<<<<<< HEAD
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+=======
*
* http://www.apache.org/licenses/LICENSE-2.0
*
+>>>>>>> master
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 55d80e2..9ff741e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -31,6 +31,7 @@
GET_CLUSTER_TOPOLOGY,
CREATE_JOB,
GET_JOB_STATUS,
+ GET_JOB_INFO,
START_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
@@ -76,6 +77,25 @@
}
}
+ public static class GetJobInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobInfoFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_INFO;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartJobFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 2bac49f..98f27f2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -103,4 +104,11 @@
deploymentId);
rpci.call(ipcHandle, dbf);
}
+
+ @Override
+ public JobInfo getJobInfo(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
+ jobId);
+ return (JobInfo) rpci.call(ipcHandle, gjsf);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 98836cd..1916360 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -148,7 +149,7 @@
binaryURLs.add(new URL(url));
}
}
- /**deploy the URLs to the CC and NCs*/
+ /** deploy the URLs to the CC and NCs */
hci.deployBinary(binaryURLs, deploymentId);
return deploymentId;
}
@@ -176,4 +177,9 @@
EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
+
+ @Override
+ public JobInfo getJobInfo(JobId jobId) throws Exception {
+ return hci.getJobInfo(jobId);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 2820cdf..1e44e91 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -44,6 +45,16 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
/**
+ * Gets detailed information about the specified Job.
+ *
+ * @param jobId
+ * JobId of the Job
+ * @return {@link JobStatus}
+ * @throws Exception
+ */
+ public JobInfo getJobInfo(JobId jobId) throws Exception;
+
+ /**
* Start the specified Job.
*
* @param appName
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 737152b..d0eeada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -46,4 +47,7 @@
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public JobInfo getJobInfo(JobId jobId) throws Exception;
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 79efaa0..48d7275 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -43,6 +43,11 @@
}
@Override
+ public JobSpecification getJobSpecification() {
+ return spec;
+ }
+
+ @Override
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 978348b..99a0712 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -23,4 +23,6 @@
public interface IActivityClusterGraphGeneratorFactory extends Serializable {
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+
+ public JobSpecification getJobSpecification();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
new file mode 100644
index 0000000..812a32a
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public class JobInfo implements Serializable{
+
+ private final JobId jobId;
+
+ private JobStatus status;
+
+ private List<Exception> exceptions;
+
+ private JobStatus pendingStatus;
+
+ private List<Exception> pendingExceptions;
+
+ private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
+ public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
+ this.jobId = jobId;
+ this.operatorLocations = operatorLocations;
+ this.status = status;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(JobStatus status) {
+ this.status = status;
+ }
+
+ public List<Exception> getExceptions() {
+ return exceptions;
+ }
+
+ public void setExceptions(List<Exception> exceptions) {
+ this.exceptions = exceptions;
+ }
+
+ public JobStatus getPendingStatus() {
+ return pendingStatus;
+ }
+
+ public void setPendingStatus(JobStatus pendingStatus) {
+ this.pendingStatus = pendingStatus;
+ }
+
+ public List<Exception> getPendingExceptions() {
+ return pendingExceptions;
+ }
+
+ public void setPendingExceptions(List<Exception> pendingExceptions) {
+ this.pendingExceptions = pendingExceptions;
+ }
+
+ public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ return operatorLocations;
+ }
+
+ public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
+ this.operatorLocations = operatorLocations;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..5ec9c19 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
@@ -55,6 +56,7 @@
import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import edu.uci.ics.hyracks.control.cc.work.GetJobInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
@@ -350,6 +352,13 @@
return;
}
+ case GET_JOB_INFO: {
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+ workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
+ new IPCResponder<JobInfo>(handle, mid)));
+ return;
+ }
+
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index bae0eb5..1f68210 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
+import java.util.ArrayList;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.EnumSet;
@@ -29,6 +30,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -87,6 +89,8 @@
private List<Exception> pendingExceptions;
+ private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
this.deploymentId = deploymentId;
@@ -101,6 +105,7 @@
cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
}
public DeploymentId getDeploymentId() {
@@ -178,6 +183,15 @@
this.endTime = endTime;
}
+ public void registerOperatorLocation(OperatorDescriptorId op, String location) {
+ List<String> locations = operatorLocations.get(op);
+ if (locations == null) {
+ locations = new ArrayList<String>();
+ operatorLocations.put(op, locations);
+ }
+ locations.add(location);
+ }
+
@Override
public synchronized void waitForCompletion() throws Exception {
while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -379,4 +393,8 @@
return result;
}
-}
\ No newline at end of file
+
+ public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ return operatorLocations;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index fd6360a..d2c018f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -92,6 +93,7 @@
public void startJob() throws HyracksException {
startRunnableActivityClusters();
+ ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
}
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -326,6 +328,8 @@
tads = new ArrayList<TaskAttemptDescriptor>();
taskAttemptMap.put(nodeId, tads);
}
+ OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
+ jobRun.registerOperatorLocation(opId, nodeId);
ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
new file mode 100644
index 0000000..c2e08f2
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetJobInfoWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private final IResultCallback<JobInfo> callback;
+
+ public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run == null) {
+ run = ccs.getRunMapArchive().get(jobId);
+ }
+ JobInfo info = run == null ? null
+ : new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations());
+ callback.setValue(info);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index c7244fb..8e5b672 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -38,10 +38,10 @@
public int httpPort = 16001;
@Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
- public int heartbeatPeriod = 10000;
+ public int heartbeatPeriod = 5000;
@Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
- public int maxHeartbeatLapsePeriods = 5;
+ public int maxHeartbeatLapsePeriods = 1;
@Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
public int profileDumpPeriod = 0;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 43129f5..1d4daf7 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -123,4 +123,5 @@
}
}
-}
\ No newline at end of file
+}
+