ASTERIXDB-1747 Implemented full lifecycle capabilities for pre-distributed jobs
Added distribute and destroy functionality
Removed serialization and bytes when running pred-distributed jobs
Cleaned up methods
Enabled Mockito testing for CCS and NCS
Added Unit Test for Distributed Jobs
Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1377
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 5ff02c7..0d1d8ab 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -35,13 +35,13 @@
private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
private static final boolean DEBUG = false;
private final LinkedBlockingQueue<ActiveEvent> eventInbox;
- private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
+ private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
private ActiveJobNotificationHandler() {
this.eventInbox = new LinkedBlockingQueue<>();
this.jobId2ActiveJobInfos = new HashMap<>();
- this.entityEventListener = new HashMap<>();
+ this.entityEventListeners = new HashMap<>();
}
@Override
@@ -53,15 +53,14 @@
ActiveEvent event = getEventInbox().take();
ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
EntityId entityId = jobInfo.getEntityId();
- IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
if (DEBUG) {
LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind());
LOGGER.log(Level.WARNING, "Notifying the listener");
}
listener.notify(event);
if (event.getEventKind() == EventKind.JOB_FINISH) {
- removeFinishedJob(event.getJobId());
- removeInactiveListener(listener);
+ removeJob(event.getJobId(), listener);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -72,11 +71,18 @@
LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
}
- private void removeFinishedJob(JobId jobId) {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "Removing the job");
+ public void removeJob(JobId jobId, IActiveEntityEventsListener listener) {
+ removeFinishedJob(jobId, listener);
+ removeInactiveListener(listener);
+ }
+
+ private void removeFinishedJob(JobId jobId, IActiveEntityEventsListener listener) {
+ if (!listener.isEntityActive()) {
+ if (DEBUG) {
+ LOGGER.log(Level.WARNING, "Remove job" + jobId);
+ }
+ jobId2ActiveJobInfos.remove(jobId);
}
- jobId2ActiveJobInfos.remove(jobId);
}
private void removeInactiveListener(IActiveEntityEventsListener listener) {
@@ -84,17 +90,17 @@
if (DEBUG) {
LOGGER.log(Level.WARNING, "Removing the listener since it is not active anymore");
}
- entityEventListener.remove(listener.getEntityId());
+ entityEventListeners.remove(listener.getEntityId());
}
}
public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
if (DEBUG) {
LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
- IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
LOGGER.log(Level.WARNING, "Listener found: " + listener);
}
- return entityEventListener.get(entityId);
+ return entityEventListeners.get(entityId);
}
public synchronized ActiveJob[] getActiveJobs() {
@@ -141,7 +147,7 @@
ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
if (jobInfo != null) {
EntityId entityId = jobInfo.getEntityId();
- IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
listener.notifyJobCreation(jobId, jobSpecification);
if (DEBUG) {
LOGGER.log(Level.WARNING, "Listener was notified" + jobId);
@@ -161,9 +167,9 @@
public synchronized IActiveEntityEventsListener[] getEventListeners() {
if (DEBUG) {
LOGGER.log(Level.WARNING, "getEventListeners() was called");
- LOGGER.log(Level.WARNING, "returning " + entityEventListener.size() + " Listeners");
+ LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners");
}
- return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
+ return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]);
}
public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
@@ -172,11 +178,11 @@
"registerListener(IActiveEntityEventsListener listener) was called for the entity "
+ listener.getEntityId());
}
- if (entityEventListener.containsKey(listener.getEntityId())) {
+ if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new HyracksDataException(
"Active Entity Listener " + listener.getEntityId() + " is already registered");
}
- entityEventListener.put(listener.getEntityId(), listener);
+ entityEventListeners.put(listener.getEntityId(), listener);
}
public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
@@ -185,7 +191,7 @@
boolean found = jobId2ActiveJobInfos.get(jobId) != null;
LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
}
- if (entityEventListener.containsKey(activeJob.getEntityId())) {
+ if (entityEventListeners.containsKey(activeJob.getEntityId())) {
if (jobId2ActiveJobInfos.containsKey(jobId)) {
LOGGER.severe("Job is already being monitored for job: " + jobId);
return;
@@ -205,7 +211,7 @@
"unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+ listener.getEntityId());
}
- IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId());
+ IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
throw new HyracksDataException(
"Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index 06e9ad1..fad30fa 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -26,9 +26,9 @@
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
public class ActiveLifecycleListener implements IJobLifecycleListener {
@@ -65,8 +65,8 @@
}
@Override
- public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
- ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, acggf.getJobSpecification());
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, spec);
}
public void receive(ActivePartitionMessage message) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3c69d83..978c2eb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -233,6 +233,10 @@
this.executorService = executorService;
}
+ public SessionConfig getSessionConfig() {
+ return sessionConfig;
+ }
+
protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
List<FunctionDecl> functionDecls = new ArrayList<>();
for (Statement st : statements) {
@@ -343,7 +347,7 @@
handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
break;
case Statement.Kind.DELETE:
- handleDeleteStatement(metadataProvider, stmt, hcc);
+ handleDeleteStatement(metadataProvider, stmt, hcc, false);
break;
case Statement.Kind.CREATE_PRIMARY_FEED:
case Statement.Kind.CREATE_SECONDARY_FEED:
@@ -1403,7 +1407,7 @@
// prepare job spec(s) that would disconnect any active feeds involving the dataset.
IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
for (IActiveEntityEventsListener listener : activeListeners) {
- if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+ if (listener.isEntityUsingDataset(dataverseName, datasetName) && listener.isEntityActive()) {
throw new CompilationException(
"Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
}
@@ -1824,7 +1828,7 @@
}
}
- public void handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
+ public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
@@ -1856,7 +1860,7 @@
final JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- return compileOnly ? null : jobSpec;
+ return jobSpec;
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
@@ -1864,6 +1868,9 @@
throw e;
}
};
+ if (compileOnly) {
+ return compiler.compile();
+ }
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
@@ -1872,17 +1879,18 @@
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
- return;
+ return jobSpec;
}
JobUtils.runJob(hcc, jobSpec, true);
} finally {
locker.unlock();
}
}
+ return null;
}
- public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
- throws Exception {
+ public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, boolean compileOnly) throws Exception {
DeleteStatement stmtDelete = (DeleteStatement) stmt;
String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
@@ -1903,9 +1911,10 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- if (jobSpec != null) {
+ if (jobSpec != null && !compileOnly) {
JobUtils.runJob(hcc, jobSpec, true);
}
+ return jobSpec;
} catch (Exception e) {
if (bActiveTxn) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 55cd304..d55fde5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -278,7 +278,7 @@
// prepare job spec(s) that would disconnect any active feeds involving the dataset.
IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
for (IActiveEntityEventsListener listener : activeListeners) {
- if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+ if (listener.isEntityActive() && listener.isEntityUsingDataset(dataverseName, datasetName)) {
throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
listener.getEntityId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 780e205..aa292f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -36,6 +36,8 @@
GET_JOB_STATUS,
GET_JOB_INFO,
START_JOB,
+ DISTRIBUTE_JOB,
+ DESTROY_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
GET_DATASET_RESULT_LOCATIONS,
@@ -101,6 +103,44 @@
}
}
+ public static class DistributeJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] acggfBytes;
+
+ public DistributeJobFunction(byte[] acggfBytes) {
+ this.acggfBytes = acggfBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DISTRIBUTE_JOB;
+ }
+
+ public byte[] getACGGFBytes() {
+ return acggfBytes;
+ }
+ }
+
+ public static class DestroyJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public DestroyJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartJobFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -116,8 +156,8 @@
this.jobId = jobId;
}
- public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
- this(null, acggfBytes, jobFlags, jobId);
+ public StartJobFunction(JobId jobId) {
+ this(null, null, null, jobId);
}
public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index c049007..8e7affb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,9 +69,9 @@
}
@Override
- public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+ public JobId startJob(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
- new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+ new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -83,6 +83,20 @@
}
@Override
+ public JobId distributeJob(byte[] acggfBytes) throws Exception {
+ HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
+ new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public JobId destroyJob(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
+ new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf =
new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 008a640..5da1f34 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -103,19 +103,28 @@
}
@Override
- public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+ public JobId distributeJob(JobSpecification jobSpec) throws Exception {
JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
- return startJob(jsacggf, jobFlags, jobId);
+ return distributeJob(jsacggf);
+ }
+
+ @Override
+ public JobId destroyJob(JobId jobId) throws Exception {
+ return hci.destroyJob(jobId);
+ }
+
+ @Override
+ public JobId startJob(JobId jobId) throws Exception {
+ return hci.startJob(jobId);
}
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
- public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags, JobId jobId)
- throws Exception {
- return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, jobId);
+ public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+ return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
}
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index c4eba3d..e65cacd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -77,17 +77,33 @@
public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
/**
- * Start the specified Job.
+ * Distribute the specified Job.
*
* @param jobSpec
* Job Specification
* @param jobFlags
* Flags
- * @param jobId
- * Used to run a pre-distributed job by id (the same value will be returned)
* @throws Exception
*/
- public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+ public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Destroy the distributed graph for a pre-distributed job
+ *
+ * @param jobId
+ * The id of the predistributed job
+ * @throws Exception
+ */
+ public JobId destroyJob(JobId jobId) throws Exception;
+
+ /**
+ * Used to run a pre-distributed job by id (the same JobId will be returned)
+ *
+ * @param jobId
+ * The id of the predistributed job
+ * @throws Exception
+ */
+ public JobId startJob(JobId jobId) throws Exception;
/**
* Start the specified Job.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 39063c6..f7995d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,11 @@
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
- public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+ public JobId startJob(JobId jobId) throws Exception;
+
+ public JobId distributeJob(byte[] acggfBytes) throws Exception;
+
+ public JobId destroyJob(JobId jobId) throws Exception;
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8b24cc2..3d99cdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -55,6 +55,9 @@
public static final int INCONSISTENT_RESULT_METADATA = 18;
public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19;
public static final int NOT_A_JOBID = 20;
+ public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
+ public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
+ public static final int DISTRIBUTED_JOB_FAILURE = 23;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index cca4a13..30ffebe 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,7 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
public interface IJobLifecycleListener {
- public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
public void notifyJobStart(JobId jobId) throws HyracksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 1656c51..a33c6c9a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,6 +19,5 @@
package org.apache.hyracks.api.job;
public enum JobFlag {
- PROFILE_RUNTIME,
- STORE_JOB
+ PROFILE_RUNTIME
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 2abca66..7f90c35 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -39,5 +39,8 @@
18 = Inconsistent metadata for result set %1$s"
19 = Can't truncate or delete the file: %1$s
20 = '%1$s' is not a valid job id.
+21 = The distributed job %1$s was not found
+22 = The distributed job %1$s already exists
+23 = The distributed work failed for %1$s at %2$s
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 7ea5f70..265d3ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -27,10 +27,11 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.DestroyJobWork;
+import org.apache.hyracks.control.cc.work.DistributeJobWork;
import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetJobInfoWork;
import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -81,18 +82,34 @@
ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
break;
+ case DISTRIBUTE_JOB:
+ HyracksClientInterfaceFunctions.DistributeJobFunction djf =
+ (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
+ ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(),
+ new IPCResponder<JobId>(handle, mid)));
+ break;
+ case DESTROY_JOB:
+ HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
+ (HyracksClientInterfaceFunctions.DestroyJobFunction) fn;
+ ccs.getWorkQueue()
+ .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
+ break;
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = sjf.getJobId();
byte[] acggfBytes = null;
+ boolean predistributed = false;
if (jobId == null) {
+ //The job is new
jobId = jobIdFactory.create();
+ acggfBytes = sjf.getACGGFBytes();
+ } else {
+ //The job has been predistributed. We don't need to send an ActivityClusterGraph
+ predistributed = true;
}
- //TODO: only send these when the jobId is null
- acggfBytes = sjf.getACGGFBytes();
ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
- jobId, new IPCResponder<JobId>(handle, mid)));
+ jobId, new IPCResponder<JobId>(handle, mid), predistributed));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 21fcf92..53d7620 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.DistributedJobFailureWork;
import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -99,6 +100,11 @@
ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
break;
+ case DISTRIBUTED_JOB_FAILURE:
+ CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
+ (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
+ ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+ break;
case REGISTER_PARTITION_PROVIDER:
CCNCFunctions.RegisterPartitionProviderFunction rppf =
(CCNCFunctions.RegisterPartitionProviderFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 37c4177..346f934 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -44,6 +44,7 @@
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
@@ -90,14 +91,16 @@
private final LogFile jobLog;
- private final ServerContext serverCtx;
+ private ServerContext serverCtx;
- private final WebServer webServer;
+ private WebServer webServer;
private ClusterControllerInfo info;
private CCApplicationContext appCtx;
+ private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
+
private final WorkQueue workQueue;
private ExecutorService executor;
@@ -130,14 +133,6 @@
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
jobLog = new LogFile(jobLogFolder);
- serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
- IIPCI ccIPCI = new ClusterControllerIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
- new CCNCFunctions.SerializerDeserializer());
- IIPCI ciIPCI = new ClientInterfaceIPCI(this);
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
- new JavaSerializationBasedPayloadSerializerDeserializer());
- webServer = new WebServer(this);
// WorkQueue is in charge of heartbeat as well as other events.
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
@@ -171,6 +166,14 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
+ serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+ IIPCI ccIPCI = new ClusterControllerIPCI(this);
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+ new CCNCFunctions.SerializerDeserializer());
+ IIPCI ciIPCI = new ClientInterfaceIPCI(this);
+ clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
+ webServer = new WebServer(this);
clusterIPC.start();
clientIPC.start();
webServer.setPort(ccConfig.httpPort);
@@ -313,6 +316,10 @@
return nodeManager;
}
+ public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
+ return preDistributedJobStore;
+ }
+
public IResourceManager getResourceManager() {
return resourceManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
new file mode 100644
index 0000000..c573ae8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class PreDistributedJobStore {
+
+ private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap;
+
+ public PreDistributedJobStore() {
+ preDistributedJobDescriptorMap = new Hashtable<>();
+ }
+
+ public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph,
+ JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
+ throws HyracksException {
+ if (preDistributedJobDescriptorMap.get(jobId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+ }
+ PreDistributedJobDescriptor descriptor =
+ new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
+ preDistributedJobDescriptorMap.put(jobId, descriptor);
+ }
+
+ public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException {
+ if (preDistributedJobDescriptorMap.get(jobId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+ }
+ }
+
+ public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException {
+ PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
+ if (descriptor == null) {
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ }
+ return descriptor;
+ }
+
+ public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
+ PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
+ if (descriptor == null) {
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ }
+ preDistributedJobDescriptorMap.remove(jobId);
+ }
+
+ public class PreDistributedJobDescriptor {
+
+ private final ActivityClusterGraph activityClusterGraph;
+
+ private final JobSpecification jobSpecification;
+
+ private final Set<Constraint> activityClusterGraphConstraints;
+
+ private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph,
+ JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
+ this.activityClusterGraph = activityClusterGraph;
+ this.jobSpecification = jobSpecification;
+ this.activityClusterGraphConstraints = activityClusterGraphConstraints;
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return activityClusterGraph;
+ }
+
+ public JobSpecification getJobSpecification() {
+ return jobSpecification;
+ }
+
+ public Set<Constraint> getActivityClusterGraphConstraints() {
+ return activityClusterGraphConstraints;
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index e43a59d..77b9b17 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -32,9 +32,9 @@
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ApplicationContext;
@@ -93,10 +93,10 @@
}
}
- public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec)
throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobCreation(jobId, acggf);
+ l.notifyJobCreation(jobId, spec);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 46a173e..c4cf38d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -38,8 +38,8 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -72,7 +72,7 @@
}
@Override
- public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec)
throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3eece52..8f7b0cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -31,8 +31,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -54,7 +52,9 @@
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
+import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.Task;
import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -65,6 +65,7 @@
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+
public class JobExecutor {
private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
@@ -74,15 +75,19 @@
private final PartitionConstraintSolver solver;
+ private final boolean predistributed;
+
private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
private final Set<TaskCluster> inProgressTaskClusters;
private final Random random;
- public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
+ public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
+ boolean predistributed) {
this.ccs = ccs;
this.jobRun = jobRun;
+ this.predistributed = predistributed;
solver = new PartitionConstraintSolver();
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
inProgressTaskClusters = new HashSet<TaskCluster>();
@@ -90,6 +95,10 @@
random = new Random();
}
+ public boolean isPredistributed() {
+ return predistributed;
+ }
+
public JobRun getJobRun() {
return jobRun;
}
@@ -475,7 +484,7 @@
jobRun.getConnectorPolicyMap());
INodeManager nodeManager = ccs.getNodeManager();
try {
- byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+ byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
String nodeId = entry.getKey();
final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index e3f9557..741e3db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -35,7 +35,6 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
@@ -105,7 +104,7 @@
@Override
public void add(JobRun jobRun) throws HyracksException {
checkJob(jobRun);
- JobSpecification job = jobRun.getActivityClusterGraphFactory().getJobSpecification();
+ JobSpecification job = jobRun.getJobSpecification();
IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
switch (status) {
case QUEUE:
@@ -214,7 +213,7 @@
}
// Releases cluster capacitys occupied by the job.
- JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+ JobSpecification job = run.getJobSpecification();
jobCapacityController.release(job);
// Picks the next job to execute.
@@ -273,8 +272,10 @@
activeRunMap.put(jobId, run);
CCApplicationContext appCtx = ccs.getApplicationContext();
- IActivityClusterGraphGeneratorFactory acggf = run.getActivityClusterGraphFactory();
- appCtx.notifyJobCreation(jobId, acggf);
+ JobSpecification spec = run.getJobSpecification();
+ if (!run.getExecutor().isPredistributed()) {
+ appCtx.notifyJobCreation(jobId, spec);
+ }
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
callback.setValue(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 5682194..3aa9043 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,12 +20,14 @@
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -40,9 +42,11 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor;
import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
import org.apache.hyracks.control.cc.executor.JobExecutor;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
@@ -59,13 +63,11 @@
private final JobId jobId;
- private final IActivityClusterGraphGeneratorFactory acggf;
-
- private final IActivityClusterGraphGenerator acgg;
+ private final JobSpecification spec;
private final ActivityClusterGraph acg;
- private final JobExecutor scheduler;
+ private JobExecutor scheduler;
private final Set<JobFlag> jobFlags;
@@ -99,17 +101,13 @@
private final IResultCallback<JobId> callback;
- public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
- IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
- IResultCallback<JobId> callback) {
+ private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback,
+ JobSpecification spec, ActivityClusterGraph acg) {
this.deploymentId = deploymentId;
this.jobId = jobId;
- this.acggf = acggf;
- this.acgg = acgg;
- this.acg = acgg.initialize();
- this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
this.jobFlags = jobFlags;
- this.callback = callback;
+ this.spec = spec;
+ this.acg = acg;
activityClusterPlanMap = new HashMap<>();
pmm = new PartitionMatchMaker();
participatingNodeIds = new HashSet<>();
@@ -118,18 +116,37 @@
connectorPolicyMap = new HashMap<>();
operatorLocations = new HashMap<>();
createTime = System.currentTimeMillis();
+ this.callback = callback;
+ }
+
+ //Run a Pre-distributed job by passing the JobId
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback,
+ PreDistributedJobDescriptor distributedJobDescriptor)
+ throws HyracksException {
+ this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback,
+ distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
+ Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
+ this.scheduler = new JobExecutor(ccs, this, constaints, true);
+ }
+
+ //Run a new job by creating an ActivityClusterGraph
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+ IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
+ IResultCallback<JobId> callback) {
+ this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize());
+ this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
}
public DeploymentId getDeploymentId() {
return deploymentId;
}
- public JobId getJobId() {
- return jobId;
+ public JobSpecification getJobSpecification() {
+ return spec;
}
- public IActivityClusterGraphGeneratorFactory getActivityClusterGraphFactory() {
- return acggf;
+ public JobId getJobId() {
+ return jobId;
}
public ActivityClusterGraph getActivityClusterGraph() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index eac9800..6cf75bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -68,7 +68,7 @@
Iterator<JobRun> runIterator = jobQueue.iterator();
while (runIterator.hasNext()) {
JobRun run = runIterator.next();
- JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+ JobSpecification job = run.getJobSpecification();
// Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
// or not.
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
new file mode 100644
index 0000000..df98252
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DestroyJobWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private final IResultCallback<JobId> callback;
+
+ public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) {
+ this.jobId = jobId;
+ this.ccs = ccs;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
+ INodeManager nodeManager = ccs.getNodeManager();
+ for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+ node.getNodeController().destroyJob(jobId);
+ }
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
new file mode 100644
index 0000000..f0c3303
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributeJobWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final byte[] acggfBytes;
+ private final JobId jobId;
+ private final IResultCallback<JobId> callback;
+
+ public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId,
+ IResultCallback<JobId> callback) {
+ this.jobId = jobId;
+ this.ccs = ccs;
+ this.acggfBytes = acggfBytes;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ final CCApplicationContext appCtx = ccs.getApplicationContext();
+ ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
+ IActivityClusterGraphGeneratorFactory acggf =
+ (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, appCtx);
+ IActivityClusterGraphGenerator acgg =
+ acggf.createActivityClusterGraphGenerator(jobId, appCtx, EnumSet.noneOf(JobFlag.class));
+ ActivityClusterGraph acg = acgg.initialize();
+ ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
+ acgg.getConstraints());
+
+ appCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
+
+ byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+
+ INodeManager nodeManager = ccs.getNodeManager();
+ for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+ node.getNodeController().distributeJob(jobId, acgBytes);
+ }
+
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
new file mode 100644
index 0000000..f7fa2a4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributedJobFailureWork extends SynchronizableWork {
+ protected final JobId jobId;
+ protected final String nodeId;
+
+ public DistributedJobFailureWork(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void doRun() throws HyracksException {
+ throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index fefd3b6..c608712 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -40,15 +40,17 @@
private final DeploymentId deploymentId;
private final JobId jobId;
private final IResultCallback<JobId> callback;
+ private final boolean predestributed;
public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback) {
+ EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.callback = callback;
+ this.predestributed = predestributed;
}
@Override
@@ -56,11 +58,21 @@
IJobManager jobManager = ccs.getJobManager();
try {
final CCApplicationContext appCtx = ccs.getApplicationContext();
- IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
- .deserialize(acggfBytes, deploymentId, appCtx);
- IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
- JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+ JobRun run;
+ if (!predestributed) {
+ //Need to create the ActivityClusterGraph
+ IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+ .deserialize(acggfBytes, deploymentId, appCtx);
+ IActivityClusterGraphGenerator acgg =
+ acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
+ run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+ } else {
+ //ActivityClusterGraph has already been distributed
+ run = new JobRun(ccs, deploymentId, jobId, callback,
+ ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+ }
jobManager.add(run);
+
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 5e1b856..88b8939 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -65,7 +65,7 @@
// Mocks an immediately executable job.
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
- when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+ when(run.getJobSpecification()).thenReturn(job);
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
@@ -81,7 +81,7 @@
// Mocks a deferred job.
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
- when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+ when(run.getJobSpecification()).thenReturn(job);
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
@@ -97,7 +97,7 @@
try {
JobRun run = mockJobRun(8193);
JobSpecification job = mock(JobSpecification.class);
- when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+ when(run.getJobSpecification()).thenReturn(job);
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run);
@@ -138,7 +138,7 @@
try {
JobRun run = mockJobRun(1);
JobSpecification job = mock(JobSpecification.class);
- when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+ when(run.getJobSpecification()).thenReturn(job);
when(jobCapacityController.allocate(job))
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
jobManager.add(run);
@@ -162,14 +162,14 @@
// A normal run.
JobRun run1 = mockJobRun(1);
JobSpecification job1 = mock(JobSpecification.class);
- when(run1.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job1);
+ when(run1.getJobSpecification()).thenReturn(job1);
when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run1);
// A failure run.
JobRun run2 = mockJobRun(2);
JobSpecification job2 = mock(JobSpecification.class);
- when(run2.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job2);
+ when(run2.getJobSpecification()).thenReturn(job2);
when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
jobManager.add(run2);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a0c0f95..4159594 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -44,6 +44,8 @@
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
throws Exception;
+ public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception;
+
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 5c27a6f..a10f8f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -50,6 +50,10 @@
public void undeployBinary(DeploymentId deploymentId) throws Exception;
+ public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+
+ public void destroyJob(JobId jobId) throws Exception;
+
public void dumpState(String stateDumpId) throws Exception;
public void shutdown(boolean terminateNCService) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4ee34ca..4eb1732 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -100,6 +100,10 @@
SHUTDOWN_REQUEST,
SHUTDOWN_RESPONSE,
+ DISTRIBUTE_JOB,
+ DESTROY_JOB,
+ DISTRIBUTED_JOB_FAILURE,
+
STATE_DUMP_REQUEST,
STATE_DUMP_RESPONSE,
@@ -282,6 +286,31 @@
}
}
+ public static class ReportDistributedJobFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DISTRIBUTED_JOB_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
public static class NotifyJobletCleanupFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -670,6 +699,51 @@
}
}
+ public static class DistributeJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final byte[] acgBytes;
+
+ public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DISTRIBUTE_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getacgBytes() {
+ return acgBytes;
+ }
+ }
+
+ public static class DestroyJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public DestroyJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ac6fc2c..83ef32b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -141,6 +141,13 @@
}
@Override
+ public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
+ CCNCFunctions.ReportDistributedJobFailureFunction fn =
+ new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void getNodeControllerInfos() throws Exception {
ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0d59b8d..2a8464e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,6 +84,18 @@
}
@Override
+ public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
+ CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void destroyJob(JobId jobId) throws Exception {
+ CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void dumpState(String stateDumpId) throws Exception {
CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
ipcHandle.send(-1, dsf, null);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 93ccaa4..e6278be 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.DestroyJobWork;
+import org.apache.hyracks.control.nc.work.DistributeJobWork;
import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import org.apache.hyracks.control.nc.work.StartTasksWork;
import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -99,6 +101,16 @@
ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
return;
+ case DISTRIBUTE_JOB:
+ CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn;
+ ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes()));
+ return;
+
+ case DESTROY_JOB:
+ CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn;
+ ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId()));
+ return;
+
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 19f01c1..bf0ddb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -46,6 +46,8 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
@@ -94,11 +96,11 @@
private final IOManager ioManager;
- private final IPCSystem ipc;
+ private IPCSystem ipc;
- private final PartitionManager partitionManager;
+ private PartitionManager partitionManager;
- private final NetworkManager netManager;
+ private NetworkManager netManager;
private IDatasetPartitionManager datasetPartitionManager;
@@ -155,18 +157,11 @@
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
- new NodeControllerIPCI(this),
- new CCNCFunctions.SerializerDeserializer());
ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices));
if (id == null) {
throw new Exception("id not set");
}
- partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
- ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
- FullFrameChannelInterfaceFactory.INSTANCE);
lccm = new LifeCycleComponentManager();
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
@@ -244,7 +239,13 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+ new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
ipc.start();
+ partitionManager = new PartitionManager(this);
+ netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
+ ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+ FullFrameChannelInterfaceFactory.INSTANCE);
netManager.start();
startApplication();
@@ -365,8 +366,28 @@
return jobletMap;
}
- public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
- return activityClusterGraphMap;
+ public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
+ if (activityClusterGraphMap.get(jobId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+ }
+ activityClusterGraphMap.put(jobId, acg);
+ }
+
+ public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
+ if (activityClusterGraphMap.get(jobId) == null) {
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ }
+ activityClusterGraphMap.remove(jobId);
+ }
+
+ public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
+ if (activityClusterGraphMap.get(jobId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+ }
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
+ return activityClusterGraphMap.get(jobId);
}
public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
new file mode 100644
index 0000000..55dd01e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * destroy a pre-distributed job
+ *
+ */
+public class DestroyJobWork extends AbstractWork {
+
+ private final NodeControllerService ncs;
+ private final JobId jobId;
+
+ public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.removeActivityClusterGraph(jobId);
+ } catch (HyracksException e) {
+ try {
+ ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
new file mode 100644
index 0000000..3a4f6ac
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DistributeJobWork extends AbstractWork {
+
+ private final NodeControllerService ncs;
+ private final byte[] acgBytes;
+ private final JobId jobId;
+
+ public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.checkForDuplicateDistributedJob(jobId);
+ ActivityClusterGraph acg =
+ (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext());
+ ncs.storeActivityClusterGraph(jobId, acg);
+ } catch (HyracksException e) {
+ try {
+ ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 803f15a..6cd9fa2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -46,6 +46,7 @@
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityCluster;
@@ -186,17 +187,12 @@
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap();
- ActivityClusterGraph acg = acgMap.get(jobId);
+ ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
if (acg == null) {
if (acgBytes == null) {
- throw new HyracksException("Joblet was not found. This job was most likely aborted.");
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
}
acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
- if (flags.contains(JobFlag.STORE_JOB)) {
- //TODO: Right now the map is append-only
- acgMap.put(jobId, acg);
- }
}
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index b51a578..a7677f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -63,10 +63,10 @@
public static final String NC1_ID = "nc1";
public static final String NC2_ID = "nc2";
- private static ClusterControllerService cc;
+ protected static ClusterControllerService cc;
protected static NodeControllerService nc1;
protected static NodeControllerService nc2;
- private static IHyracksClientConnection hcc;
+ protected static IHyracksClientConnection hcc;
private final List<File> outputFiles;
private static AtomicInteger aInteger = new AtomicInteger(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index efbb9d2..160336a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -114,6 +114,11 @@
@Test
public void optimizedSortMergeTest02() throws Exception {
+ JobSpecification spec = createSortMergeJobSpec();
+ runTest(spec);
+ }
+
+ public static JobSpecification createSortMergeJobSpec() throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
@@ -156,19 +161,17 @@
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
- spec.connect(
- new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
- 1, 0 }, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
- new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ new int[] { 1, 0 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
-
- runTest(spec);
+ return spec;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
new file mode 100644
index 0000000..2509515
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PredistributedJobsTest {
+ private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName());
+
+ private static final String NC1_ID = "nc1";
+ private static final String NC2_ID = "nc2";
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = 10000;
+ FileUtils.deleteQuietly(new File("target" + File.separator + "data"));
+ FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data"));
+ File outDir = new File("target" + File.separator + "ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
+ cc = Mockito.spy(ccBase);
+ cc.start();
+
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
+ + File.separator + "device0";
+ NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
+ nc1 = Mockito.spy(nc1Base);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.resultIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
+ + File.separator + "device1";
+ NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
+ nc2 = Mockito.spy(nc2Base);
+ nc2.start();
+
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
+
+ @Test
+ public void DistributedTest() throws Exception {
+ JobSpecification spec1 = UnionTest.createUnionJobSpec();
+ JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
+
+ //distribute both jobs
+ JobId jobId1 = hcc.distributeJob(spec1);
+ JobId jobId2 = hcc.distributeJob(spec2);
+
+ //make sure it finished
+ //cc will get the store once to check for duplicate insertion and once to insert per job
+ verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
+ verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+ verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+ verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+ verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+
+ //confirm that both jobs are distributed
+ Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null);
+ Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null);
+ Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null);
+ Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null);
+
+ //run the first job
+ hcc.startJob(jobId1);
+ hcc.waitForCompletion(jobId1);
+
+ //destroy the first job
+ hcc.destroyJob(jobId1);
+
+ //make sure it finished
+ verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
+ verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+ verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+
+ //confirm the first job is destroyed
+ Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null);
+ cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
+
+ //run the second job
+ hcc.startJob(jobId2);
+ hcc.waitForCompletion(jobId2);
+
+ //run the second job again
+ hcc.startJob(jobId2);
+ hcc.waitForCompletion(jobId2);
+
+ //destroy the second job
+ hcc.destroyJob(jobId2);
+
+ //make sure it finished
+ verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
+ verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+ verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+
+ //confirm the second job is destroyed
+ Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null);
+ cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
+ }
+
+ @AfterClass
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 02cab8f..542f037 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -44,6 +44,11 @@
public class UnionTest extends AbstractIntegrationTest {
@Test
public void union01() throws Exception {
+ JobSpecification spec = createUnionJobSpec();
+ runTest(spec);
+ }
+
+ public static JobSpecification createUnionJobSpec() throws Exception {
JobSpecification spec = new JobSpecification();
IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
@@ -82,6 +87,6 @@
spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
spec.addRoot(printer);
- runTest(spec);
+ return spec;
}
}