[NO ISSUE][TEST] Add NC Storage API Test
- user model changes: no
- storage format changes: no
- interface changes: yes
- Add IPartitionReplica to use it at the
APIs level.
- Rename IStorageSubsystem -> IReplicaManager
Details:
- Add option to TestExecutor to target specific
NC end point.
- Add storage API test case.
Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2195
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d8636c8..8e73405 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -30,8 +30,8 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -109,17 +109,18 @@
private JsonNode getStatus(Predicate<Integer> predicate) {
final ArrayNode status = OBJECT_MAPPER.createArrayNode();
- final IStorageSubsystem storageSubsystem = appCtx.getStorageSubsystem();
+ final IReplicaManager storageSubsystem = appCtx.getReplicaManager();
final Set<Integer> partitions =
storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
for (Integer partition : partitions) {
final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
partitionJson.put("partition", partition);
- final List<PartitionReplica> replicas = storageSubsystem.getReplicas(partition);
+ final List<IPartitionReplica> replicas = storageSubsystem.getReplicas(partition);
ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
- for (PartitionReplica replica : replicas) {
+ for (IPartitionReplica replica : replicas) {
final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
- replicaJson.put("location", replica.getIdentifier().getLocation().toString());
+ final InetSocketAddress location = replica.getIdentifier().getLocation();
+ replicaJson.put("location", location.getHostString() + ":" + location.getPort());
replicaJson.put("status", replica.getStatus().toString());
replicasArray.add(replicaJson);
}
@@ -135,7 +136,7 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+ appCtx.getReplicaManager().addReplica(replicaIdentifier);
response.setStatus(HttpResponseStatus.OK);
}
@@ -145,7 +146,7 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+ appCtx.getReplicaManager().removeReplica(replicaIdentifier);
response.setStatus(HttpResponseStatus.OK);
}
@@ -156,7 +157,7 @@
if (partition == null || host == null || port == null) {
return null;
}
- final InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(host, Integer.valueOf(port));
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b6bf2df..75159af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -61,7 +61,7 @@
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -142,8 +142,8 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
- private IStorageSubsystem storageSubsystem;
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+ private IReplicaManager replicaManager;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -213,7 +213,7 @@
final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toSet());
- storageSubsystem = new StorageSubsystem(nodePartitionsIds);
+ replicaManager = new ReplicaManager(nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
@@ -528,8 +528,8 @@
}
@Override
- public IStorageSubsystem getStorageSubsystem() {
- return storageSubsystem;
+ public IReplicaManager getReplicaManager() {
+ return replicaManager;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
similarity index 85%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 24aa376..0c84a6e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -26,11 +26,12 @@
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.storage.PartitionReplica;
-public class StorageSubsystem implements IStorageSubsystem {
+public class ReplicaManager implements IReplicaManager {
/**
* the partitions to which the current node is master
@@ -41,7 +42,7 @@
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
- public StorageSubsystem(Set<Integer> partitions) {
+ public ReplicaManager(Set<Integer> partitions) {
this.partitions.addAll(partitions);
}
@@ -52,6 +53,7 @@
"This node is not the current master of partition(" + id.getPartition() + ")");
}
replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+ replicas.get(id).sync();
}
@Override
@@ -63,7 +65,7 @@
}
@Override
- public List<PartitionReplica> getReplicas(int partition) {
+ public List<IPartitionReplica> getReplicas(int partition) {
return replicas.entrySet().stream().filter(e -> e.getKey().getPartition() == partition).map(Map.Entry::getValue)
.collect(Collectors.toList());
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b518f94..d840daf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.api.Duration;
@@ -129,6 +130,8 @@
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
+ private static Map<String, InetSocketAddress> ncEndPoints;
+ private static Map<String, InetSocketAddress> replicationAddress;
/*
* Instance members
@@ -158,6 +161,14 @@
this.librarian = librarian;
}
+ public void setNcEndPoints(Map<String, InetSocketAddress> ncEndPoints) {
+ this.ncEndPoints = ncEndPoints;
+ }
+
+ public void setNcReplicationAddress(Map<String, InetSocketAddress> replicationAddress) {
+ this.replicationAddress = replicationAddress;
+ }
+
/**
* Probably does not work well with symlinks.
*/
@@ -1139,7 +1150,10 @@
// we only reach here if the loop is over
testLoops.remove(testFile);
break;
-
+ case "sto":
+ command = stripJavaComments(statement).trim().split(" ");
+ executeStorageCommand(command);
+ break;
default:
throw new IllegalArgumentException("No statements of type " + ctx.getType());
}
@@ -1510,15 +1524,26 @@
}
protected URI createEndpointURI(String path, String query) throws URISyntaxException {
- int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
- InetSocketAddress endpoint = endpoints.get(endpointIdx);
+ InetSocketAddress endpoint;
+ if (!path.startsWith("nc:")) {
+ int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+ endpoint = endpoints.get(endpointIdx);
+ } else {
+ final String[] tokens = path.split(" ");
+ if (tokens.length != 2) {
+ throw new IllegalArgumentException("Unrecognized http pattern");
+ }
+ String nodeId = tokens[0].substring(3);
+ endpoint = getNcEndPoint(nodeId);
+ path = tokens[1];
+ }
URI uri = new URI("http", null, endpoint.getHostString(), endpoint.getPort(), path, query, null);
LOGGER.fine("Created endpoint URI: " + uri);
return uri;
}
public URI getEndpoint(String servlet) throws URISyntaxException {
- return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
+ return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)), null);
}
public static String stripJavaComments(String text) {
@@ -1622,6 +1647,41 @@
LOGGER.info("Cluster state now " + desiredState);
}
+ private void executeStorageCommand(String[] command) throws Exception {
+ String srcNode = command[0];
+ String api = command[1];
+ final URI endpoint = getEndpoint(srcNode + " " + Servlets.getAbsolutePath(Servlets.STORAGE) + api);
+ String partition = command[2];
+ String destNode = command[3];
+ final InetSocketAddress destAddress = getNcReplicationAddress(destNode);
+ List<Parameter> parameters = new ArrayList<>(3);
+ Stream.of("partition", "host", "port").forEach(arg -> {
+ Parameter p = new Parameter();
+ p.setName(arg);
+ parameters.add(p);
+ });
+ parameters.get(0).setValue(partition);
+ parameters.get(1).setValue(destAddress.getHostName());
+ parameters.get(2).setValue(String.valueOf(destAddress.getPort()));
+ final HttpUriRequest httpUriRequest = constructPostMethod(endpoint, parameters);
+ final HttpResponse httpResponse = executeHttpRequest(httpUriRequest);
+ Assert.assertEquals(HttpStatus.SC_OK, httpResponse.getStatusLine().getStatusCode());
+ }
+
+ private InetSocketAddress getNcEndPoint(String nodeId) {
+ if (ncEndPoints == null || !ncEndPoints.containsKey(nodeId)) {
+ throw new IllegalStateException("No end point specified for node: " + nodeId);
+ }
+ return ncEndPoints.get(nodeId);
+ }
+
+ private InetSocketAddress getNcReplicationAddress(String nodeId) {
+ if (replicationAddress == null || !replicationAddress.containsKey(nodeId)) {
+ throw new IllegalStateException("No replication address specified for node: " + nodeId);
+ }
+ return replicationAddress.get(nodeId);
+ }
+
abstract static class TestLoop extends Exception {
private final String target;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
new file mode 100644
index 0000000..56c7bc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static final TestExecutor testExecutor = new TestExecutor();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ testExecutor.setNcReplicationAddress(replicationAddress);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("replication.xml", "replication.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public ReplicationExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
new file mode 100644
index 0000000..d287fad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /admin/storage/partition/0
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
new file mode 100644
index 0000000..a635676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -0,0 +1,28 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="replication">
+ <test-case FilePath="replication">
+ <compilation-unit name="add_replica">
+ <output-dir compare="Text">add_replica</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
new file mode 100644
index 0000000..3553d9c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -0,0 +1,7 @@
+[ {
+ "partition" : 0,
+ "replicas" : [ {
+ "location" : "127.0.0.1:2017",
+ "status" : "DISCONNECTED"
+ } ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 0503c09..28be6fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -29,8 +29,7 @@
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -119,7 +118,7 @@
@Override
INCServiceContext getServiceContext();
- IStorageSubsystem getStorageSubsystem();
-
IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
+
+ IReplicaManager getReplicaManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
new file mode 100644
index 0000000..034d668
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+public class ReplicationException extends RuntimeException {
+
+ public ReplicationException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
new file mode 100644
index 0000000..5a9dc3f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public interface IPartitionReplica {
+
+ enum PartitionReplicaStatus {
+ /* replica is in-sync with master */
+ IN_SYNC,
+ /* replica is still catching up with master */
+ CATCHING_UP,
+ /* replica is not connected with master */
+ DISCONNECTED
+ }
+
+ /**
+ * Gets the status of a replica.
+ *
+ * @return The status
+ */
+ PartitionReplicaStatus getStatus();
+
+ /**
+ * Gets the identifier of a replica
+ *
+ * @return The identifier
+ */
+ ReplicaIdentifier getIdentifier();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
similarity index 90%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index b4f06cb..a3b2b50 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -21,7 +21,9 @@
import java.util.List;
import java.util.Set;
-public interface IStorageSubsystem {
+import org.apache.asterix.common.replication.IPartitionReplica;
+
+public interface IReplicaManager {
/**
* Adds a replica with the specified {@code id}
@@ -43,7 +45,7 @@
* @param partition
* @return The list of replicas
*/
- List<PartitionReplica> getReplicas(int partition);
+ List<IPartitionReplica> getReplicas(int partition);
/**
* Gets the list of partition to which the current node is
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5f23bf..1ac3ffa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -46,4 +46,8 @@
private Servlets() {
}
+
+ public static String getAbsolutePath(String servlet) {
+ return servlet.replaceAll("/\\*$", "");
+ }
}
diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml
index f209aae..2b5fe0c 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -71,6 +71,14 @@
<artifactId>asterix-transactions</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
new file mode 100644
index 0000000..c6d1b60
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final ReplicaIdentifier id;
+ private PartitionReplicaStatus status = DISCONNECTED;
+
+ public PartitionReplica(ReplicaIdentifier id) {
+ this.id = id;
+ }
+
+ @Override
+ public synchronized PartitionReplicaStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public ReplicaIdentifier getIdentifier() {
+ return id;
+ }
+
+ public synchronized void sync() {
+ if (status == IN_SYNC || status == CATCHING_UP) {
+ return;
+ }
+ }
+
+ public JsonNode asJson() {
+ ObjectNode json = OBJECT_MAPPER.createObjectNode();
+ json.put("id", id.toString());
+ json.put("state", status.name());
+ return json;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionReplica that = (PartitionReplica) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONUtil.convertNode(asJson());
+ } catch (JsonProcessingException e) {
+ throw new ReplicationException(e);
+ }
+ }
+}
\ No newline at end of file