[ASTERIXDB-3072] Make HDFS scheduler to use different type of channels
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
HDFS scheduler only uses plain connection. This change allows it
to use different type of connections
Change-Id: I64f9f4a30a6564aac26e99433ccc695975a22a3b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17224
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 9e49d86..3506216 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -62,6 +62,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
import org.apache.parquet.hadoop.ParquetInputFormat;
@@ -72,10 +73,12 @@
public static Scheduler initializeHDFSScheduler(ICCServiceContext serviceCtx) throws HyracksDataException {
ICCContext ccContext = serviceCtx.getCCContext();
+ INetworkSecurityManager networkSecurityManager = serviceCtx.getControllerService().getNetworkSecurityManager();
Scheduler scheduler = null;
try {
scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
+ ccContext.getClusterControllerInfo().getClientNetPort(),
+ networkSecurityManager.getSocketChannelFactory());
} catch (HyracksException e) {
throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index b9d68f7..f5bf07b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.hdfs.api.INcCollection;
import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
@@ -56,16 +57,24 @@
private static final Logger LOGGER = LogManager.getLogger();
- /** a list of NCs */
+ /**
+ * a list of NCs
+ */
private String[] NCs;
- /** a map from ip to NCs */
+ /**
+ * a map from ip to NCs
+ */
private Map<String, List<String>> ipToNcMapping = new HashMap<>();
- /** a map from the NC name to the index */
+ /**
+ * a map from the NC name to the index
+ */
private Map<String, Integer> ncNameToIndex = new HashMap<>();
- /** a map from NC name to the NodeControllerInfo */
+ /**
+ * a map from NC name to the NodeControllerInfo
+ */
private Map<String, NodeControllerInfo> ncNameToNcInfos;
/**
@@ -76,13 +85,15 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
+ * @param ipAddress IP address
+ * @param port Port
+ * @param channelFactory Channel Factory
* @throws HyracksException
*/
- public Scheduler(String ipAddress, int port) throws HyracksException {
+ public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException {
try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port, channelFactory);
this.ncNameToNcInfos = hcc.getNodeControllerInfos();
ClusterTopology topology = hcc.getClusterTopology();
this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
@@ -96,23 +107,6 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- this.ncNameToNcInfos = hcc.getNodeControllerInfos();
- this.ncCollectionBuilder = ncCollectionBuilder;
- loadIPAddressToNCMap(ncNameToNcInfos);
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- }
-
- /**
- * The constructor of the scheduler.
- *
* @param ncNameToNcInfos the mapping from nc names to nc infos
* @throws HyracksException
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
index ddf140f..a26a5f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
@@ -44,15 +45,14 @@
* @param ncNameToNcInfos
* @throws HyracksException
*/
- public Scheduler(String ipAddress, int port) throws HyracksException {
- scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
+ public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException {
+ scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port, channelFactory);
}
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
@@ -62,10 +62,8 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @param topology
- * the hyracks cluster toplogy
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
+ * @param topology the hyracks cluster toplogy
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
@@ -76,8 +74,7 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)