Prepare AsterixDB for Pre-Distributed Jobs
Change-Id: Id809f4b563bbed808c7764d1af664a15919db35b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1366
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 7f25896..51ed40d 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -23,9 +23,10 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public abstract class ActiveSourceOperatorNodePushable extends AbstractOperatorNodePushable implements IActiveRuntime {
+public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
+ implements IActiveRuntime {
protected final IHyracksTaskContext ctx;
protected final ActiveManager activeManager;
@@ -108,10 +109,6 @@
}
}
- @Override
- public final int getInputArity() {
- return 0;
- }
@Override
public final IFrameWriter getInputFrameWriter(int index) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index ef0af26..780e205 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -107,17 +107,25 @@
private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
private final DeploymentId deploymentId;
+ private final JobId jobId;
- public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
- this.acggfBytes = acggfBytes;
- this.jobFlags = jobFlags;
- this.deploymentId = null;
- }
-
- public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.deploymentId = deploymentId;
+ this.jobId = jobId;
+ }
+
+ public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
+ this(null, acggfBytes, jobFlags, jobId);
+ }
+
+ public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+ this(null, acggfBytes, jobFlags, null);
+ }
+
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+ this(deploymentId, acggfBytes, jobFlags, null);
}
@Override
@@ -125,6 +133,10 @@
return FunctionId.START_JOB;
}
+ public JobId getJobId() {
+ return jobId;
+ }
+
public byte[] getACGGFBytes() {
return acggfBytes;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f8c8512..c049007 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,6 +69,13 @@
}
@Override
+ public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
deploymentId, acggfBytes, jobFlags);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 8c0557e..eb92c37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,10 +102,22 @@
return startJob(jsacggf, jobFlags);
}
+ @Override
+ public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+ return startJob(jsacggf, jobFlags, jobId);
+ }
+
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
+ public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags, JobId jobId)
+ throws Exception {
+ return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, jobId);
+ }
+
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
return hci.getDatasetDirectoryServiceInfo();
}
@@ -179,7 +191,8 @@
@Override
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
- EnumSet<JobFlag> jobFlags) throws Exception {
+ EnumSet<JobFlag> jobFlags)
+ throws Exception {
return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index fd4d21b..031896e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -87,6 +87,21 @@
*
* @param appName
* Name of the application
+ * @param jobSpec
+ * Job Specification
+ * @param jobFlags
+ * Flags
+ * @param jobId
+ * Used to run a pre-distributed job by id (the same value will be returned)
+ * @throws Exception
+ */
+ public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param appName
+ * Name of the application
* @param acggf
* Activity Cluster Graph Generator Factory
* @param jobFlags
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 3f3c120..39063c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,6 +38,8 @@
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+ public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9a..1656c51 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
package org.apache.hyracks.api.job;
public enum JobFlag {
- PROFILE_RUNTIME
+ PROFILE_RUNTIME,
+ STORE_JOB
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 01c3bf5..26beb63 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -84,9 +84,15 @@
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
- JobId jobId = jobIdFactory.create();
- ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(),
- sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+ JobId jobId = sjf.getJobId();
+ byte[] acggfBytes = null;
+ if (jobId == null) {
+ jobId = jobIdFactory.create();
+ }
+ //TODO: only send these when the jobId is null
+ acggfBytes = sjf.getACGGFBytes();
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+ jobId, new IPCResponder<JobId>(handle, mid)));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index f00303f..bdb13e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
@@ -114,6 +115,8 @@
private final Map<JobId, Joblet> jobletMap;
+ private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+
private ExecutorService executor;
private NodeParameters nodeParameters;
@@ -167,6 +170,7 @@
lccm = new LifeCycleComponentManager();
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
+ activityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
new File(new File(NodeControllerService.class.getName()), id));
@@ -356,6 +360,10 @@
return jobletMap;
}
+ public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
+ return activityClusterGraphMap;
+ }
+
public NetworkManager getNetworkManager() {
return netManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index ad9481d..d27caf2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -186,11 +186,18 @@
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- if (acgBytes == null) {
- throw new HyracksException("Joblet was not found. This job was most likely aborted.");
+ Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap();
+ ActivityClusterGraph acg = acgMap.get(jobId);
+ if (acg == null) {
+ if (acgBytes == null) {
+ throw new HyracksException("Joblet was not found. This job was most likely aborted.");
+ }
+ acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+ if (flags.contains(JobFlag.STORE_JOB)) {
+ //TODO: Right now the map is append-only
+ acgMap.put(jobId, acg);
+ }
}
- ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId,
- appCtx);
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}