[NO ISSUE][API] Add NC Storage API
- user model changes: no
- storage format changes: no
- interface changes: yes
Add IStorageSubsystem to track storage partitions
replicas.
Details:
- Add NC API port.
- Add storage API to NCs.
- Add StorageSubsystem to track storage partitions
replicas.
Change-Id: I120d9892bc9fe5a73395cd5a2ddc30b51b73ced2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2190
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2a8a831..98dabc9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -321,6 +321,7 @@
@Override
public void stop() throws Exception {
// ungraceful shutdown
+ webManager.stop();
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
new file mode 100644
index 0000000..d8636c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class StorageApiServlet extends AbstractServlet {
+
+ private static final Logger LOGGER = Logger.getLogger(StorageApiServlet.class.getName());
+ private final INcApplicationContext appCtx;
+
+ public StorageApiServlet(ConcurrentMap<String, Object> ctx, INcApplicationContext appCtx, String... paths) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws IOException {
+ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
+ PrintWriter responseWriter = response.writer();
+ try {
+ JsonNode json;
+ response.setStatus(HttpResponseStatus.OK);
+ final String path = localPath(request);
+ if ("".equals(path)) {
+ json = getStatus(p -> true);
+ } else if (path.startsWith("/partition")) {
+ json = getPartitionStatus(path);
+ } else {
+ throw new IllegalArgumentException();
+ }
+ JSONUtil.writeNode(responseWriter, json);
+ } catch (IllegalArgumentException e) {
+ LOGGER.log(Level.INFO, "Unrecognized path: " + request, e);
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) throws Exception {
+ switch (localPath(request)) {
+ case "/addReplica":
+ processAddReplica(request, response);
+ break;
+ case "/removeReplica":
+ processRemoveReplica(request, response);
+ break;
+ default:
+ sendError(response, HttpResponseStatus.NOT_FOUND);
+ break;
+ }
+ }
+
+ private JsonNode getPartitionStatus(String path) {
+ String[] token = path.split("/");
+ if (token.length != 3) {
+ throw new IllegalArgumentException();
+ }
+ // get the partition number from the path
+ final Integer partition = Integer.valueOf(token[2]);
+ return getStatus(partition::equals);
+ }
+
+ private JsonNode getStatus(Predicate<Integer> predicate) {
+ final ArrayNode status = OBJECT_MAPPER.createArrayNode();
+ final IStorageSubsystem storageSubsystem = appCtx.getStorageSubsystem();
+ final Set<Integer> partitions =
+ storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
+ for (Integer partition : partitions) {
+ final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
+ partitionJson.put("partition", partition);
+ final List<PartitionReplica> replicas = storageSubsystem.getReplicas(partition);
+ ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
+ for (PartitionReplica replica : replicas) {
+ final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
+ replicaJson.put("location", replica.getIdentifier().getLocation().toString());
+ replicaJson.put("status", replica.getStatus().toString());
+ replicasArray.add(replicaJson);
+ }
+ partitionJson.set("replicas", replicasArray);
+ status.add(partitionJson);
+ }
+ return status;
+ }
+
+ private void processAddReplica(IServletRequest request, IServletResponse response) {
+ final ReplicaIdentifier replicaIdentifier = getReplicaIdentifier(request);
+ if (replicaIdentifier == null) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+ response.setStatus(HttpResponseStatus.OK);
+ }
+
+ private void processRemoveReplica(IServletRequest request, IServletResponse response) {
+ final ReplicaIdentifier replicaIdentifier = getReplicaIdentifier(request);
+ if (replicaIdentifier == null) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+ response.setStatus(HttpResponseStatus.OK);
+ }
+
+ private ReplicaIdentifier getReplicaIdentifier(IServletRequest request) {
+ final String partition = request.getParameter("partition");
+ final String host = request.getParameter("host");
+ final String port = request.getParameter("port");
+ if (partition == null || host == null || port == null) {
+ return null;
+ }
+ final InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(host, Integer.valueOf(port));
+ return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index a3def26..5cae2d6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -22,12 +22,14 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
@@ -58,6 +60,7 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -138,6 +141,7 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
+ private IStorageSubsystem storageSubsystem;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -204,15 +208,17 @@
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
datasetMemoryManager, ioManager.getIODevices().size());
-
+ final String nodeId = getServiceContext().getNodeId();
+ final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
+ .collect(Collectors.toSet());
+ storageSubsystem = new StorageSubsystem(nodePartitionsIds);
isShuttingdown = false;
-
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
this.ncServiceContext);
if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
- String nodeId = getServiceContext().getNodeId();
replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -518,4 +524,9 @@
}
return hcc;
}
+
+ @Override
+ public IStorageSubsystem getStorageSubsystem() {
+ return storageSubsystem;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
new file mode 100644
index 0000000..24aa376
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public class StorageSubsystem implements IStorageSubsystem {
+
+ /**
+ * the partitions to which the current node is master
+ */
+ private final Set<Integer> partitions = new HashSet<>();
+ /**
+ * current replicas
+ */
+ private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
+
+ public StorageSubsystem(Set<Integer> partitions) {
+ this.partitions.addAll(partitions);
+ }
+
+ @Override
+ public synchronized void addReplica(ReplicaIdentifier id) {
+ if (!partitions.contains(id.getPartition())) {
+ throw new IllegalStateException(
+ "This node is not the current master of partition(" + id.getPartition() + ")");
+ }
+ replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+ }
+
+ @Override
+ public synchronized void removeReplica(ReplicaIdentifier id) {
+ if (!replicas.containsKey(id)) {
+ throw new IllegalStateException("replica with id(" + id + ") does not exist");
+ }
+ replicas.remove(id);
+ }
+
+ @Override
+ public List<PartitionReplica> getReplicas(int partition) {
+ return replicas.entrySet().stream().filter(e -> e.getKey().getPartition() == partition).map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Set<Integer> getPartitions() {
+ return Collections.unmodifiableSet(partitions);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 8b417a9..36ed35d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -25,6 +25,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.api.http.server.StorageApiServlet;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -41,6 +43,7 @@
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
+import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
@@ -59,6 +62,7 @@
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
public class NCApplication extends BaseNCApplication {
@@ -137,7 +141,11 @@
}
protected void configureServers() throws Exception {
- // override to start web services on NC nodes
+ HttpServer apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
+ getApplicationContext().getExternalProperties().getNcApiPort());
+ apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx);
+ apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE));
+ webManager.add(apiServer);
}
protected List<AsterixExtension> getExtensions() {
@@ -210,8 +218,8 @@
StorageProperties storageProperties = runtimeContext.getStorageProperties();
// Deducts the reserved buffer cache size and memory component size from the maxium heap size,
// and deducts one core for processing heartbeats.
- long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize()
- - storageProperties.getMemoryComponentGlobalBudget();
+ long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() - storageProperties
+ .getMemoryComponentGlobalBudget();
int allCores = Runtime.getRuntime().availableProcessors();
int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores;
return new NodeCapacity(memorySize, maximumCoresForComputation);
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 7f78b26..7b7d52a 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -48,10 +48,12 @@
<id>nc1</id>
<cluster_ip>127.0.0.1</cluster_ip>
<replication_port>2016</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<replication_port>2017</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8afa66d..162e693 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -115,4 +116,6 @@
@Override
INCServiceContext getServiceContext();
+
+ IStorageSubsystem getStorageSubsystem();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index bde8303..e718903 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.common.config;
-import static org.apache.hyracks.control.common.config.OptionTypes.*;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
@@ -31,6 +33,7 @@
WEB_QUERYINTERFACE_PORT(INTEGER, 19006, "The listen port of the query web interface"),
API_PORT(INTEGER, 19002, "The listen port of the API server"),
ACTIVE_PORT(INTEGER, 19003, "The listen port of the active server"),
+ NC_API_PORT(INTEGER, 19004, "The listen port of the node controller API server"),
LOG_LEVEL(LEVEL, java.util.logging.Level.WARNING, "The logging level for master and slave processes"),
MAX_WAIT_ACTIVE_CLUSTER(INTEGER, 60, "The max pending time (in seconds) for cluster startup. After the " +
"threshold, if the cluster still is not up and running, it is considered unavailable"),
@@ -55,6 +58,8 @@
case API_PORT:
case ACTIVE_PORT:
return Section.CC;
+ case NC_API_PORT:
+ return Section.NC;
case LOG_LEVEL:
case MAX_WAIT_ACTIVE_CLUSTER:
return Section.COMMON;
@@ -117,4 +122,8 @@
public String getCCJavaParams() {
return accessor.getString(Option.CC_JAVA_OPTS);
}
+
+ public int getNcApiPort() {
+ return accessor.getInt(Option.NC_API_PORT);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index eacc18b..467e877 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -59,6 +59,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.ConfigUtil;
+import org.apache.asterix.event.schema.cluster.Node;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -142,6 +143,11 @@
// marking node as virtual, as we're not using NCServices with old-style config
configManager.set(store.getNcId(), NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED);
}
+ // populate nc api port from cluster properties
+ final ExternalProperties.Option ncApiPort = ExternalProperties.Option.NC_API_PORT;
+ for (Node node : ClusterProperties.INSTANCE.getCluster().getNode()) {
+ configManager.set(node.getId(), ncApiPort, node.getNcApiPort().intValue());
+ }
// Get extensions
if (asterixConfiguration.getExtensions() != null) {
for (Extension ext : asterixConfiguration.getExtensions().getExtension()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
new file mode 100644
index 0000000..b4f06cb
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.util.List;
+import java.util.Set;
+
+public interface IStorageSubsystem {
+
+ /**
+ * Adds a replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void addReplica(ReplicaIdentifier id);
+
+ /**
+ * Removes the replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void removeReplica(ReplicaIdentifier id);
+
+ /**
+ * The existing replicas of the partition {@code partition}
+ *
+ * @param partition
+ * @return The list of replicas
+ */
+ List<PartitionReplica> getReplicas(int partition);
+
+ /**
+ * Gets the list of partition to which the current node is
+ * the master of.
+ *
+ * @return The list of partition
+ */
+ Set<Integer> getPartitions();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
new file mode 100644
index 0000000..18733ce
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica {
+
+ public enum PartitionReplicaStatus {
+ /* replica is in-sync with master */
+ IN_SYNC,
+ /* replica is still catching up with master */
+ CATCHING_UP,
+ /* replica is not connected with master */
+ DISCONNECTED
+ }
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final ReplicaIdentifier id;
+ private PartitionReplicaStatus status = DISCONNECTED;
+
+ public PartitionReplica(ReplicaIdentifier id) {
+ this.id = id;
+ }
+
+ public synchronized PartitionReplicaStatus getStatus() {
+ return status;
+ }
+
+ public ReplicaIdentifier getIdentifier() {
+ return id;
+ }
+
+ public synchronized void sync() {
+ if (status == IN_SYNC || status == CATCHING_UP) {
+ return;
+ }
+ //TODO complete implementation
+ }
+
+ public JsonNode asJson() {
+ ObjectNode json = OBJECT_MAPPER.createObjectNode();
+ json.put("id", id.toString());
+ json.put("state", status.name());
+ return json;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionReplica that = (PartitionReplica) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONUtil.convertNode(asJson());
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
new file mode 100644
index 0000000..01ffba6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.net.InetSocketAddress;
+
+public class ReplicaIdentifier {
+
+ private final int partition;
+ private final InetSocketAddress location;
+ private final String id;
+
+ private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ this.partition = partition;
+ this.location = location;
+ id = partition + "@" + location.toString();
+ }
+
+ public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
+ return new ReplicaIdentifier(partition, location);
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public InetSocketAddress getLocation() {
+ return location;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicaIdentifier that = (ReplicaIdentifier) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 0f7ab4d..d5f23bf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -42,6 +42,7 @@
public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*";
public static final String DIAGNOSTICS = "/admin/diagnostics";
public static final String ACTIVE_STATS = "/admin/active/*";
+ public static final String STORAGE = "/admin/storage/*";
private Servlets() {
}
diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
index 8cd7b42..be189e3 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
@@ -58,6 +58,7 @@
<xs:element name="cc_root" type="xs:string" />
<xs:element name="strategy" type="xs:string" />
<xs:element name="node_id" type="xs:string" />
+ <xs:element name="nc_api_port" type="xs:integer" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -143,6 +144,7 @@
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
<xs:element ref="cl:replication_port" minOccurs="0" />
+ <xs:element ref="cl:nc_api_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 415433e..c7f16fc 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -42,7 +42,8 @@
public static final String CLIENT_NODE_ID = "client_node";
public static final String CLUSTER_IP = "127.0.0.1";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, CLUSTER_IP, null, null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, CLUSTER_IP, null, null, null, null, null, null,
+ null);
private static String eventsDir;
private static Map<String, String> env = new HashMap<String, String>();
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index 4dc3124..9d684ec 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,7 +186,7 @@
String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, cluster.getMasterNode().getDebugPort(), null);
+ null, null, cluster.getMasterNode().getDebugPort(), null, null);
}
List<Node> nodeList = cluster.getNode();
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 748d811..3e079bd 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null, null);
+ masterNode.getLogDir(), null, null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 2721eec..500172c 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -45,11 +45,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>node2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
index f2afe5e..b26d836 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -51,12 +51,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
index 57d04c7..c445835 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
@@ -68,6 +68,7 @@
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
<replication_port>2000</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
@@ -75,5 +76,6 @@
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
<replication_port>2001</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
index 7a435b7..fbe0de8 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
@@ -72,6 +72,7 @@
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
<replication_port>2000</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
@@ -79,5 +80,6 @@
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
<replication_port>2001</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
index 9eb728f..5ad3921 100644
--- a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
+++ b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
@@ -46,9 +46,11 @@
<node>
<id>nc1</id>
<cluster_ip>10.10.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>10.10.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
index bb66131..003b3c8 100644
--- a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
+++ b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
@@ -55,9 +55,11 @@
<node>
<id>nc1</id>
<cluster_ip>10.10.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>10.10.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
index 4d5d0bd..ed82a55 100644
--- a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
+++ b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
@@ -60,17 +60,21 @@
<node>
<id>nc1</id>
<cluster_ip>172.20.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>172.20.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
<node>
<id>nc3</id>
<cluster_ip>172.20.0.5</cluster_ip>
+ <nc_api_port>19006</nc_api_port>
</node>
<node>
<id>nc4</id>
<cluster_ip>172.20.0.6</cluster_ip>
+ <nc_api_port>19007</nc_api_port>
</node>
</cluster>
diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
index 184728d..bec2122 100644
--- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
+++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
@@ -19,12 +19,14 @@
txn.log.dir=data/red/txnlog
core.dump.dir=data/red/coredump
iodevices=data/red
+nc.api.port=19004
[nc/blue]
ncservice.port=9091
txn.log.dir=data/blue/txnlog
core.dump.dir=data/blue/coredump
iodevices=data/blue
+nc.api.port=19005
${NC_BLUE_EXTRA}
[nc]
diff --git a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
index 2a1c652..a6cb064 100644
--- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
+++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
@@ -19,6 +19,7 @@
txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/asterix_nc2]
@@ -26,6 +27,7 @@
txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]