Extensible AsterixHyracksIntegrationUtil
Change-Id: I52bb7c4059cd0c808e404619f9bcc02cb3eafd73
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1083
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index cc50b75..3dbc310 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,13 +19,15 @@
package org.apache.asterix.api.common;
import java.io.File;
+import java.net.Inet4Address;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
import org.apache.commons.io.FileUtils;
@@ -45,75 +47,28 @@
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
- public static ClusterControllerService cc;
- public static NodeControllerService[] ncs;
- public static IHyracksClientConnection hcc;
+ public ClusterControllerService cc;
+ public NodeControllerService[] ncs;
+ public IHyracksClientConnection hcc;
- private static AsterixPropertiesAccessor propertiesAccessor;
+ private AsterixPropertiesAccessor propertiesAccessor;
- public static void init(boolean deleteOldInstanceData) throws Exception {
+ public void init(boolean deleteOldInstanceData) throws Exception {
propertiesAccessor = new AsterixPropertiesAccessor();
- ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
}
- CCConfig ccConfig = new CCConfig();
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
- ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
- ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.resultTTL = 30000;
- ccConfig.resultSweepThreshold = 1000;
- ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
- // ccConfig.useJOL = true;
- cc = new ClusterControllerService(ccConfig);
+ cc = new ClusterControllerService(createCCConfig());
cc.start();
// Starts ncs.
- int n = 0;
List<String> nodes = propertiesAccessor.getNodeNames();
+ List<NodeControllerService> nodeControllers = new ArrayList<>();
for (String ncName : nodes) {
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
- ncConfig1.nodeId = ncName;
- ncConfig1.resultTTL = 30000;
- ncConfig1.resultSweepThreshold = 1000;
- ncConfig1.appArgs = Arrays.asList("-virtual-NC");
- String tempPath = System.getProperty(IO_DIR_KEY);
- if (tempPath.endsWith(File.separator)) {
- tempPath = tempPath.substring(0, tempPath.length() - 1);
- }
- System.err.println("Using the path: " + tempPath);
- // get initial partitions from properties
- String[] nodeStores = propertiesAccessor.getStores().get(ncName);
- if (nodeStores == null) {
- throw new Exception("Coudn't find stores for NC: " + ncName);
- }
- String tempDirPath = System.getProperty(IO_DIR_KEY);
- if (!tempDirPath.endsWith(File.separator)) {
- tempDirPath += File.separator;
- }
- for (int p = 0; p < nodeStores.length; p++) {
- // create IO devices based on stores
- String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
- File ioDeviceDir = new File(iodevicePath);
- ioDeviceDir.mkdirs();
- if (p == 0) {
- ncConfig1.ioDevices = iodevicePath;
- } else {
- ncConfig1.ioDevices += "," + iodevicePath;
- }
- }
- ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
- NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1);
- ncs[n] = nodeControllerService;
+ NodeControllerService nodeControllerService = new NodeControllerService(createNCConfig(ncName));
+ nodeControllers.add(nodeControllerService);
Thread ncStartThread = new Thread() {
@Override
public void run() {
@@ -125,24 +80,76 @@
}
};
ncStartThread.start();
- ++n;
}
hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
+ ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
}
- public static String[] getNcNames() {
+ protected CCConfig createCCConfig() {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clusterNetIpAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+ ccConfig.clientNetIpAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+ ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ccConfig.defaultMaxJobAttempts = 0;
+ ccConfig.resultTTL = 30000;
+ ccConfig.resultSweepThreshold = 1000;
+ ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
+ return ccConfig;
+ }
+
+ protected NCConfig createNCConfig(String ncName) throws AsterixException {
+ NCConfig ncConfig = new NCConfig();
+ ncConfig.ccHost = "localhost";
+ ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ncConfig.clusterNetIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+ ncConfig.dataIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+ ncConfig.resultIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+ ncConfig.nodeId = ncName;
+ ncConfig.resultTTL = 30000;
+ ncConfig.resultSweepThreshold = 1000;
+ ncConfig.appArgs = Collections.singletonList("-virtual-NC");
+ String tempPath = System.getProperty(IO_DIR_KEY);
+ if (tempPath.endsWith(File.separator)) {
+ tempPath = tempPath.substring(0, tempPath.length() - 1);
+ }
+ System.err.println("Using the path: " + tempPath);
+ // get initial partitions from properties
+ String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+ if (nodeStores == null) {
+ throw new AsterixException("Coudn't find stores for NC: " + ncName);
+ }
+ String tempDirPath = System.getProperty(IO_DIR_KEY);
+ if (!tempDirPath.endsWith(File.separator)) {
+ tempDirPath += File.separator;
+ }
+ for (int p = 0; p < nodeStores.length; p++) {
+ // create IO devices based on stores
+ String iodevicePath = tempDirPath + ncConfig.nodeId + File.separator + nodeStores[p];
+ File ioDeviceDir = new File(iodevicePath);
+ ioDeviceDir.mkdirs();
+ if (p == 0) {
+ ncConfig.ioDevices = iodevicePath;
+ } else {
+ ncConfig.ioDevices += "," + iodevicePath;
+ }
+ }
+ ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
+ return ncConfig;
+ }
+
+ public String[] getNcNames() {
return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
}
- public static IHyracksClientConnection getHyracksClientConnection() {
+ public IHyracksClientConnection getHyracksClientConnection() {
return hcc;
}
- public static void deinit(boolean deleteOldInstanceData) throws Exception {
+ public void deinit(boolean deleteOldInstanceData) throws Exception {
//stop NCs
ArrayList<Thread> stopNCThreads = new ArrayList<>();
- for (int n = 0; n < ncs.length; ++n) {
- NodeControllerService nodeControllerService = ncs[n];
+ for (NodeControllerService nodeControllerService : ncs) {
if (nodeControllerService != null) {
Thread ncStopThread = new Thread() {
@Override
@@ -174,14 +181,14 @@
}
}
- public static void runJob(JobSpecification spec) throws Exception {
+ public void runJob(JobSpecification spec) throws Exception {
GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
hcc.waitForCompletion(jobId);
}
- public static void removeTestStorageFiles() {
+ public void removeTestStorageFiles() {
File dir = new File(System.getProperty(IO_DIR_KEY));
for (String ncName : propertiesAccessor.getNodeNames()) {
File ncDir = new File(dir, ncName);
@@ -189,7 +196,7 @@
}
}
- private static void deleteTransactionLogs() throws Exception {
+ private void deleteTransactionLogs() throws Exception {
for (String ncId : propertiesAccessor.getNodeNames()) {
File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
if (log.exists()) {
@@ -206,11 +213,12 @@
* unused
*/
public static void main(String[] args) {
+ AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
- deinit(false);
+ integrationUtil.deinit(false);
} catch (Exception e) {
e.printStackTrace();
}
@@ -219,7 +227,7 @@
try {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
- init(false);
+ integrationUtil.init(false);
while (true) {
Thread.sleep(10000);
}
@@ -228,5 +236,4 @@
System.exit(1);
}
}
-
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
index cc4b40c..821822f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
@@ -45,6 +45,8 @@
public List<String> args;
}
+ private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
public static void main(String args[]) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -55,7 +57,7 @@
for (String queryFile : options.args) {
Reader in = new FileReader(queryFile);
AsterixJavaClient ajc = new AsterixJavaClient(
- AsterixHyracksIntegrationUtil.getHyracksClientConnection(), in, compilationProvider);
+ integrationUtil.getHyracksClientConnection(), in, compilationProvider);
try {
ajc.compile(true, false, false, false, false, true, false);
} finally {
@@ -80,11 +82,11 @@
File lsn = new File("last_checkpoint_lsn");
lsn.deleteOnExit();
- AsterixHyracksIntegrationUtil.init(false);
+ integrationUtil.init(false);
}
public static void tearDown() throws Exception {
- AsterixHyracksIntegrationUtil.deinit(false);
+ integrationUtil.deinit(false);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 5c3aefe..596dc41 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -133,7 +133,7 @@
}
jobletCtx = Mockito.mock(IHyracksJobletContext.class);
Mockito.when(jobletCtx.getApplicationContext())
- .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext());
+ .thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext());
Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
@Override
public JobId answer(InvocationOnMock invocation) throws Throwable {
@@ -248,7 +248,7 @@
}
public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
- FileSplit fileSplit = new FileSplit(AsterixHyracksIntegrationUtil.ncs[0].getId(),
+ FileSplit fileSplit = new FileSplit(ExecutionTestUtil.integrationUtil.ncs[0].getId(),
dataset.getDataverseName() + File.separator + dataset.getDatasetName());
return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
}
@@ -367,12 +367,12 @@
ctx = Mockito.spy(ctx);
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
Mockito.when(ctx.getIOManager())
- .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getRootContext().getIOManager());
+ .thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getRootContext().getIOManager());
return ctx;
}
public TransactionSubsystem getTransactionSubsystem() {
- return (TransactionSubsystem) ((AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0]
+ return (TransactionSubsystem) ((AsterixAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0]
.getApplicationContext().getApplicationObject()).getTransactionSubsystem();
}
@@ -381,7 +381,7 @@
}
public AsterixAppRuntimeContext getAppRuntimeContext() {
- return (AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext()
+ return (AsterixAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext()
.getApplicationObject();
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
index 0a22f1c..84fb2b3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
@@ -46,6 +46,8 @@
private static final PrintWriter ERR = new PrintWriter(System.err);
private final TestExecutor testExecutor = new TestExecutor();
+ private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
@Test
public void enlistTest() throws Exception {
File outdir = new File(PATH_ACTUAL);
@@ -54,11 +56,11 @@
}
outdir.mkdirs();
- AsterixHyracksIntegrationUtil.init(true);
+ integrationUtil.init(true);
Reader loadReader = new BufferedReader(
new InputStreamReader(new FileInputStream(LOAD_FOR_ENLIST_FILE), "UTF-8"));
AsterixJavaClient asterixLoad = new AsterixJavaClient(
- AsterixHyracksIntegrationUtil.getHyracksClientConnection(), loadReader, ERR,
+ integrationUtil.getHyracksClientConnection(), loadReader, ERR,
new AqlCompilationProvider());
try {
asterixLoad.compile(true, false, false, false, false, true, false);
@@ -69,7 +71,7 @@
}
asterixLoad.execute();
- AsterixHyracksIntegrationUtil.deinit(true);
+ integrationUtil.deinit(true);
for (String d : ASTERIX_DATA_DIRS) {
testExecutor.deleteRec(new File(d));
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 4089646..cbe6e85 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -48,18 +48,19 @@
private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
private static final TestExecutor testExecutor = new TestExecutor();
+ private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
@BeforeClass
public static void setUp() throws Exception {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
- AsterixHyracksIntegrationUtil.init(true);
+ integrationUtil.init(true);
}
@AfterClass
public static void tearDown() throws Exception {
- AsterixHyracksIntegrationUtil.deinit(true);
+ integrationUtil.deinit(true);
File outdir = new File(PATH_ACTUAL);
File[] files = outdir.listFiles();
if (files == null || files.length == 0) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index d0be428..b29c0b0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -73,6 +73,8 @@
private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider();
private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider();
+ private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
@BeforeClass
public static void setUp() throws Exception {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
@@ -81,7 +83,7 @@
HDFSCluster.getInstance().setup();
- AsterixHyracksIntegrationUtil.init(true);
+ integrationUtil.init(true);
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
@@ -98,7 +100,7 @@
HDFSCluster.getInstance().cleanup();
- AsterixHyracksIntegrationUtil.deinit(true);
+ integrationUtil.deinit(true);
}
private static void suiteBuildPerFile(File file, Collection<Object[]> testArgs, String path) {
@@ -168,7 +170,7 @@
PrintWriter plan = new PrintWriter(actualFile);
ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
: sqlppCompilationProvider;
- IHyracksClientConnection hcc = AsterixHyracksIntegrationUtil.getHyracksClientConnection();
+ IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection();
AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider);
try {
asterix.compile(true, false, false, true, true, false, false);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 813def3..545faed 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -46,6 +46,8 @@
protected static TestGroup FailedGroup;
+ public static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
public static List<ILibraryManager> setUp(boolean cleanup) throws Exception {
System.out.println("Starting setup");
if (LOGGER.isLoggable(Level.INFO)) {
@@ -56,7 +58,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("initializing pseudo cluster");
}
- AsterixHyracksIntegrationUtil.init(cleanup);
+ integrationUtil.init(cleanup);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("initializing HDFS");
@@ -77,7 +79,7 @@
// Adds the library manager for CC.
libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager());
// Adds library managers for NCs, one-per-NC.
- for (NodeControllerService nc : AsterixHyracksIntegrationUtil.ncs) {
+ for (NodeControllerService nc : integrationUtil.ncs) {
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
.getApplicationObject();
libraryManagers.add(runtimeCtx.getLibraryManager());
@@ -87,7 +89,7 @@
public static void tearDown(boolean cleanup) throws Exception {
// validateBufferCacheState(); <-- Commented out until bug is fixed -->
- AsterixHyracksIntegrationUtil.deinit(cleanup);
+ integrationUtil.deinit(cleanup);
File outdir = new File(PATH_ACTUAL);
File[] files = outdir.listFiles();
if (files == null || files.length == 0) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index b827a0d..84542d0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -67,7 +67,7 @@
@AfterClass
public static void tearDown() throws Exception {
ExecutionTestUtil.tearDown(cleanupOnStop);
- AsterixHyracksIntegrationUtil.removeTestStorageFiles();
+ ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
}
@Parameters(name = "SqlppExecutionTest {index}: {0}")