[NO ISSUE][NET] Move ResultSet from servlet to application context
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Move ResultSet instance ownership from servlet context to
application context
- Add close() methods to HyracksConnection, ResultDirectory,
and ResultSet
- Move CcApplicationContext from asterix-runtime to asterix-app
Change-Id: Id46661bdf62538a901258b5c72c065a3865a0650
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16384
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index 4dadf55..bf77c24 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -28,8 +28,6 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -61,13 +59,7 @@
}
protected IResultSet getResultSet() throws Exception { // NOSONAR
- try {
- return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx);
- } catch (IPCException e) {
- LOGGER.log(Level.WARN, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e);
- ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc());
- return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx);
- }
+ return ServletUtil.getResultSet(appCtx, ctx);
}
protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 081c69a..56ad88e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -134,7 +134,7 @@
try {
// TODO: warnings should be retrieved from warnings collectors
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
+ IResultSet resultSet = ServletUtil.getResultSet(appCtx, ctx);
IParser parser = parserFactory.createParser(query);
List<Statement> statements = parser.parse();
SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true, planFormat);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
index 3ac37bf..8d5e4db 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
@@ -24,29 +24,22 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.result.IResultSet;
-import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.http.api.IServletRequest;
public class ServletUtil {
- static IResultSet getResultSet(IHyracksClientConnection hcc, IApplicationContext appCtx,
- final Map<String, Object> ctx) throws Exception {
+ static IResultSet getResultSet(IApplicationContext appCtx, final Map<String, Object> ctx) throws Exception {
IResultSet resultSet = (IResultSet) ctx.get(RESULTSET_ATTR);
if (resultSet == null) {
synchronized (ctx) {
resultSet = (IResultSet) ctx.get(RESULTSET_ATTR);
if (resultSet == null) {
- resultSet = new ResultSet(hcc,
- appCtx.getServiceContext().getControllerService().getNetworkSecurityManager()
- .getSocketChannelFactory(),
- appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS);
+ resultSet = appCtx.getResultSet();
ctx.put(RESULTSET_ATTR, resultSet);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
similarity index 87%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 66f0e73..880880e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.utils;
+package org.apache.asterix.app.cc;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
@@ -56,13 +57,21 @@
import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.utils.NoOpCoordinationService;
+import org.apache.asterix.runtime.utils.RequestTracker;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.util.NetworkUtil;
/*
* Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
@@ -87,7 +96,8 @@
private MessagingProperties messagingProperties;
private NodeProperties nodeProperties;
private Supplier<IMetadataBootstrap> metadataBootstrapSupplier;
- private IHyracksClientConnection hcc;
+ private volatile HyracksConnection hcc;
+ private volatile ResultSet resultSet;
private Object extensionManager;
private INcLifecycleCoordinator ftStrategy;
private IJobLifecycleListener activeLifeCycleListener;
@@ -103,7 +113,7 @@
private final IAdapterFactoryService adapterFactoryService;
private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
- public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
+ public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager,
@@ -188,18 +198,44 @@
@Override
public IHyracksClientConnection getHcc() throws HyracksDataException {
- if (!hcc.isConnected()) {
+ HyracksConnection hc = hcc;
+ if (!hc.isConnected()) {
synchronized (this) {
- if (!hcc.isConnected()) {
+ hc = hcc;
+ if (!hc.isConnected()) {
try {
- hcc = new HyracksConnection(hcc.getHost(), hcc.getPort());
+ ResultSet rs = resultSet;
+ resultSet = null;
+ NetworkUtil.closeQuietly(rs);
+
+ NetworkUtil.closeQuietly(hc);
+ hcc = hc = new HyracksConnection(hcc.getHost(), hcc.getPort());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
}
}
- return hcc;
+ return hc;
+ }
+
+ @Override
+ public IResultSet getResultSet() throws HyracksDataException {
+ ResultSet rs = resultSet;
+ if (rs == null) {
+ synchronized (this) {
+ rs = resultSet;
+ if (rs == null) {
+ try {
+ resultSet = rs = ResultReader.createResultSet(getHcc(), ccServiceCtx.getControllerService(),
+ compilerProperties);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
+ return rs;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 1a89168..a46522e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
@@ -86,6 +87,8 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.ipc.impl.HyracksConnection;
@@ -108,6 +111,7 @@
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.cache.CacheManager;
import org.apache.hyracks.util.cache.ICacheManager;
import org.apache.logging.log4j.Level;
@@ -147,7 +151,8 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
- private IHyracksClientConnection hcc;
+ private volatile HyracksConnection hcc;
+ private volatile ResultSet resultSet;
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private IReplicaManager replicaManager;
private IReceptionist receptionist;
@@ -513,15 +518,22 @@
@Override
public IHyracksClientConnection getHcc() throws HyracksDataException {
- if (hcc == null || !hcc.isConnected()) {
+ HyracksConnection hc = hcc;
+ if (hc == null || !hc.isConnected()) {
synchronized (this) {
- if (hcc == null || !hcc.isConnected()) {
+ hc = hcc;
+ if (hc == null || !hc.isConnected()) {
try {
+ ResultSet rs = resultSet;
+ resultSet = null;
+ NetworkUtil.closeQuietly(rs);
+
NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
// TODO(mblow): multicc
CcId primaryCcId = ncSrv.getPrimaryCcId();
ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
- hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(),
+ NetworkUtil.closeQuietly(hc);
+ hcc = hc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(),
ncSrv.getNetworkSecurityManager().getSocketChannelFactory());
} catch (Exception e) {
throw HyracksDataException.create(e);
@@ -529,7 +541,26 @@
}
}
}
- return hcc;
+ return hc;
+ }
+
+ @Override
+ public IResultSet getResultSet() throws HyracksDataException {
+ ResultSet rs = resultSet;
+ if (rs == null) {
+ synchronized (this) {
+ rs = resultSet;
+ if (rs == null) {
+ try {
+ resultSet = rs = ResultReader.createResultSet(getHcc(), ncServiceContext.getControllerService(),
+ compilerProperties);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
+ return rs;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index 1acae87..56dfd5e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.result;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -27,12 +29,14 @@
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
public class ResultReader {
- private IResultSetReader reader;
+ private final IResultSetReader reader;
- private IFrameTupleAccessor frameTupleAccessor;
+ private final IFrameTupleAccessor frameTupleAccessor;
// Number of parallel result reader buffers
public static final int NUM_READERS = 1;
@@ -57,4 +61,10 @@
public IResultMetadata getMetadata() {
return reader.getResultMetadata();
}
+
+ public static ResultSet createResultSet(IHyracksClientConnection hcc, IControllerService srv,
+ CompilerProperties compilerProperties) throws Exception {
+ return new ResultSet(hcc, srv.getNetworkSecurityManager().getSocketChannelFactory(),
+ compilerProperties.getFrameSize(), ResultReader.NUM_READERS);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 8f3deb1..2a66cfd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -53,6 +53,7 @@
import org.apache.asterix.api.http.server.VersionApiServlet;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.app.config.ConfigValidator;
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
@@ -85,7 +86,6 @@
import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
@@ -126,7 +126,7 @@
protected WebManager webManager;
protected ICcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
- private IHyracksClientConnection hcc;
+ private HyracksConnection hcc;
@Override
public void init(IServiceContext serviceCtx) throws Exception {
@@ -209,7 +209,7 @@
IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
throws AlgebricksException, IOException {
- return new CcApplicationContext(ccServiceCtx, getHcc(), () -> MetadataManager.INSTANCE, globalRecoveryManager,
+ return new CcApplicationContext(ccServiceCtx, hcc, () -> MetadataManager.INSTANCE, globalRecoveryManager,
lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(),
createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager,
adapterFactoryService);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
index 8dcfa26..eb63218 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
@@ -31,8 +31,8 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.api.http.server.VersionApiServlet;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 0b80881..34696b1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.nc.TransactionSubsystem;
import org.apache.asterix.common.config.TransactionProperties;
@@ -61,7 +62,6 @@
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.transaction.management.runtime.CommitRuntime;
import org.apache.asterix.transaction.management.service.logging.LogReader;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index b80fa30..a2a3b48 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -26,6 +26,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -37,7 +38,6 @@
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index c71e602..e5e33d0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -34,6 +34,7 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -53,7 +54,6 @@
import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.functions.FunctionCollection;
import org.apache.asterix.runtime.functions.FunctionManager;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.active.TestEventsListener.Behavior;
import org.apache.asterix.test.base.TestMethodTracer;
import org.apache.asterix.translator.IStatementExecutor;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index c51fd95..cb123bf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -37,6 +37,7 @@
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.app.cc.CcApplicationContext;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -45,7 +46,6 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.SessionOutput;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 67f8253..f4e241c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.result.IResultSet;
public interface IApplicationContext {
@@ -67,6 +68,11 @@
IHyracksClientConnection getHcc() throws HyracksDataException;
/**
+ * @return a result set provider associated with {@link IHyracksClientConnection}
+ */
+ IResultSet getResultSet() throws HyracksDataException;
+
+ /**
* @return the cluster coordination service.
*/
ICoordinationService getCoordinationService();
diff --git a/hyracks-fullstack/hyracks/hyracks-client/pom.xml b/hyracks-fullstack/hyracks/hyracks-client/pom.xml
index 409da1f8..66b71fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-client/pom.xml
@@ -101,6 +101,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index b335a93..5dd5fff 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.client.result;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -35,7 +36,7 @@
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
-public class ResultDirectory implements IResultDirectory {
+public class ResultDirectory implements IResultDirectory, Closeable {
private final IPCSystem ipc;
private final IResultDirectory remoteResultDirectory;
@@ -64,4 +65,9 @@
public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception {
return remoteResultDirectory.getResultMetadata(jobId, rsId);
}
+
+ @Override
+ public void close() throws IOException {
+ ipc.stop();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index 4d8767f..8a88045 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.client.result;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.IHyracksCommonContext;
@@ -25,15 +28,15 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.network.ISocketChannelFactory;
-import org.apache.hyracks.api.result.IResultDirectory;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.client.net.ClientNetworkManager;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.util.NetworkUtil;
-public class ResultSet implements IResultSet {
- private final IResultDirectory resultDirectory;
+public class ResultSet implements IResultSet, Closeable {
+ private final ResultDirectory resultDirectory;
private final ClientNetworkManager netManager;
@@ -51,6 +54,15 @@
}
@Override
+ public void close() throws IOException {
+ try {
+ netManager.stop();
+ } finally {
+ NetworkUtil.closeQuietly(resultDirectory);
+ }
+ }
+
+ @Override
public IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
IResultSetReader reader = null;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index 9351348..5b65de8 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.ipc.impl;
+import java.io.Closeable;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.URL;
@@ -70,7 +71,7 @@
*
* @author vinayakb
*/
-public final class HyracksConnection implements IHyracksClientConnection {
+public final class HyracksConnection implements Closeable, IHyracksClientConnection {
private static final Logger LOGGER = LogManager.getLogger();
@@ -123,6 +124,11 @@
}
@Override
+ public void close() {
+ ipc.stop();
+ }
+
+ @Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
return hci.getJobStatus(jobId);
}