merge fullstack_dynamic_deployment--batch 4
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 48dfb1c..0467eae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -181,74 +181,6 @@
}
}
- public static class GetDatasetDirectoryServiceInfoFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
- }
- }
-
- public static class GetDatasetResultStatusFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- public GetDatasetResultStatusFunction(JobId jobId, ResultSetId rsId) {
- this.jobId = jobId;
- this.rsId = rsId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_STATUS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
- }
-
- public static class GetDatasetResultLocationsFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final DatasetDirectoryRecord[] knownRecords;
-
- public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.knownRecords = knownRecords;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_LOCATIONS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public DatasetDirectoryRecord[] getKnownRecords() {
- return knownRecords;
- }
- }
-
public static class WaitForCompletionFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index da14e9b..eb25b70 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -106,10 +106,6 @@
return hci.getDatasetDirectoryServiceInfo();
}
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- return hci.getDatasetDirectoryServiceInfo();
- }
-
@Override
public void waitForCompletion(JobId jobId) throws Exception {
hci.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 4521149..37bbdfa 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -89,14 +89,6 @@
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
/**
- * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
- *
- * @return {@link NetworkAddress}
- * @throws Exception
- */
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
-
- /**
* Waits until the specified job has completed, either successfully or has
* encountered a permanent failure.
*
@@ -176,4 +168,4 @@
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception;
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index e1a4185..aabc351 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -35,8 +35,6 @@
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
-
public void waitForCompletion(JobId jobId) throws Exception;
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 623c54f..9b8a996 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -361,27 +361,6 @@
return;
}
- case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
- workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
- new IPCResponder<NetworkAddress>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_STATUS: {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
- .getResultSetId(), new IPCResponder<Status>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_LOCATIONS: {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
- .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
- new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
- return;
- }
-
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3c9281a..a309529 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -66,6 +66,7 @@
@Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
+ DeploymentId deploymentId = null;
try {
/** add hadoop configurations */
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
@@ -115,7 +116,7 @@
for (URL url : urls)
if (url.toString().endsWith(".jar"))
jars.add(new File(url.getPath()));
- DeploymentId deploymentId = installApplication(jars);
+ deploymentId = installApplication(jars);
start = System.currentTimeMillis();
FileSystem dfs = FileSystem.get(job.getConfiguration());
@@ -151,8 +152,8 @@
/**
* destroy application if there is any exception
*/
- if (hcc != null) {
- destroyApplication(applicationName);
+ if (hcc != null && deploymentId == null) {
+ hcc.unDeployBinary(deploymentId);
}
} catch (Exception e2) {
throw new HyracksException(e2);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 2173e10..b761f62 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.core.util;
-import java.io.File;
import java.util.EnumSet;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 572bff9..0de7979 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -102,7 +102,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -193,8 +193,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -360,8 +360,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -457,8 +457,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -564,8 +564,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cec5b55..343ba7e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -85,7 +85,6 @@
@Override
public void initialize() throws HyracksDataException {
- ctxCL = Thread.currentThread().getContextClassLoader();
try {
Configuration conf = confFactory.createConfiguration(ctx);
writer.open();
@@ -107,8 +106,6 @@
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index da6d564..d6d9fe3 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -76,7 +76,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
LOGGER.info("Hyracks mini-cluster started");
startHDFS();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 87bc40d..14d7718 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -51,7 +51,6 @@
private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String FILE_EXTENSION_OF_RESULTS = "result";
private static final String DATA_PATH = "data/webmap/webmap_link.txt";
private static final String HDFS_PATH = "/webmap/";
@@ -78,7 +77,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
LOGGER.info("Hyracks mini-cluster started");
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));