[NO ISSUE][TEST] Add NC Storage API Test

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add IPartitionReplica to use it at the
    APIs level.
  - Rename IStorageSubsystem -> IReplicaManager

Details:
- Add option to TestExecutor to target specific
  NC end point.
- Add storage API test case.

Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2195
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d8636c8..8e73405 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -30,8 +30,8 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -109,17 +109,18 @@
 
     private JsonNode getStatus(Predicate<Integer> predicate) {
         final ArrayNode status = OBJECT_MAPPER.createArrayNode();
-        final IStorageSubsystem storageSubsystem = appCtx.getStorageSubsystem();
+        final IReplicaManager storageSubsystem = appCtx.getReplicaManager();
         final Set<Integer> partitions =
                 storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
         for (Integer partition : partitions) {
             final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
             partitionJson.put("partition", partition);
-            final List<PartitionReplica> replicas = storageSubsystem.getReplicas(partition);
+            final List<IPartitionReplica> replicas = storageSubsystem.getReplicas(partition);
             ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
-            for (PartitionReplica replica : replicas) {
+            for (IPartitionReplica replica : replicas) {
                 final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
-                replicaJson.put("location", replica.getIdentifier().getLocation().toString());
+                final InetSocketAddress location = replica.getIdentifier().getLocation();
+                replicaJson.put("location", location.getHostString() + ":" + location.getPort());
                 replicaJson.put("status", replica.getStatus().toString());
                 replicasArray.add(replicaJson);
             }
@@ -135,7 +136,7 @@
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+        appCtx.getReplicaManager().addReplica(replicaIdentifier);
         response.setStatus(HttpResponseStatus.OK);
     }
 
@@ -145,7 +146,7 @@
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+        appCtx.getReplicaManager().removeReplica(replicaIdentifier);
         response.setStatus(HttpResponseStatus.OK);
     }
 
@@ -156,7 +157,7 @@
         if (partition == null || host == null || port == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(host, Integer.valueOf(port));
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
         return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b6bf2df..75159af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -61,7 +61,7 @@
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -142,8 +142,8 @@
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
     private IHyracksClientConnection hcc;
-    private IStorageSubsystem storageSubsystem;
     private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    private IReplicaManager replicaManager;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -213,7 +213,7 @@
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
                 .collect(Collectors.toSet());
-        storageSubsystem = new StorageSubsystem(nodePartitionsIds);
+        replicaManager = new ReplicaManager(nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
@@ -528,8 +528,8 @@
     }
 
     @Override
-    public IStorageSubsystem getStorageSubsystem() {
-        return storageSubsystem;
+    public IReplicaManager getReplicaManager() {
+        return replicaManager;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
similarity index 85%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 24aa376..0c84a6e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -26,11 +26,12 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.storage.PartitionReplica;
 
-public class StorageSubsystem implements IStorageSubsystem {
+public class ReplicaManager implements IReplicaManager {
 
     /**
      * the partitions to which the current node is master
@@ -41,7 +42,7 @@
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
 
-    public StorageSubsystem(Set<Integer> partitions) {
+    public ReplicaManager(Set<Integer> partitions) {
         this.partitions.addAll(partitions);
     }
 
@@ -52,6 +53,7 @@
                     "This node is not the current master of partition(" + id.getPartition() + ")");
         }
         replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+        replicas.get(id).sync();
     }
 
     @Override
@@ -63,7 +65,7 @@
     }
 
     @Override
-    public List<PartitionReplica> getReplicas(int partition) {
+    public List<IPartitionReplica> getReplicas(int partition) {
         return replicas.entrySet().stream().filter(e -> e.getKey().getPartition() == partition).map(Map.Entry::getValue)
                 .collect(Collectors.toList());
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b518f94..d840daf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.api.Duration;
@@ -129,6 +130,8 @@
 
     private static Method managixExecuteMethod = null;
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
+    private static Map<String, InetSocketAddress> ncEndPoints;
+    private static Map<String, InetSocketAddress> replicationAddress;
 
     /*
      * Instance members
@@ -158,6 +161,14 @@
         this.librarian = librarian;
     }
 
+    public void setNcEndPoints(Map<String, InetSocketAddress> ncEndPoints) {
+        this.ncEndPoints = ncEndPoints;
+    }
+
+    public void setNcReplicationAddress(Map<String, InetSocketAddress> replicationAddress) {
+        this.replicationAddress = replicationAddress;
+    }
+
     /**
      * Probably does not work well with symlinks.
      */
@@ -1139,7 +1150,10 @@
                 // we only reach here if the loop is over
                 testLoops.remove(testFile);
                 break;
-
+            case "sto":
+                command = stripJavaComments(statement).trim().split(" ");
+                executeStorageCommand(command);
+                break;
             default:
                 throw new IllegalArgumentException("No statements of type " + ctx.getType());
         }
@@ -1510,15 +1524,26 @@
     }
 
     protected URI createEndpointURI(String path, String query) throws URISyntaxException {
-        int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
-        InetSocketAddress endpoint = endpoints.get(endpointIdx);
+        InetSocketAddress endpoint;
+        if (!path.startsWith("nc:")) {
+            int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+            endpoint = endpoints.get(endpointIdx);
+        } else {
+            final String[] tokens = path.split(" ");
+            if (tokens.length != 2) {
+                throw new IllegalArgumentException("Unrecognized http pattern");
+            }
+            String nodeId = tokens[0].substring(3);
+            endpoint = getNcEndPoint(nodeId);
+            path = tokens[1];
+        }
         URI uri = new URI("http", null, endpoint.getHostString(), endpoint.getPort(), path, query, null);
         LOGGER.fine("Created endpoint URI: " + uri);
         return uri;
     }
 
     public URI getEndpoint(String servlet) throws URISyntaxException {
-        return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null);
+        return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)), null);
     }
 
     public static String stripJavaComments(String text) {
@@ -1622,6 +1647,41 @@
         LOGGER.info("Cluster state now " + desiredState);
     }
 
+    private void executeStorageCommand(String[] command) throws Exception {
+        String srcNode = command[0];
+        String api = command[1];
+        final URI endpoint = getEndpoint(srcNode + " " + Servlets.getAbsolutePath(Servlets.STORAGE) + api);
+        String partition = command[2];
+        String destNode = command[3];
+        final InetSocketAddress destAddress = getNcReplicationAddress(destNode);
+        List<Parameter> parameters = new ArrayList<>(3);
+        Stream.of("partition", "host", "port").forEach(arg -> {
+            Parameter p = new Parameter();
+            p.setName(arg);
+            parameters.add(p);
+        });
+        parameters.get(0).setValue(partition);
+        parameters.get(1).setValue(destAddress.getHostName());
+        parameters.get(2).setValue(String.valueOf(destAddress.getPort()));
+        final HttpUriRequest httpUriRequest = constructPostMethod(endpoint, parameters);
+        final HttpResponse httpResponse = executeHttpRequest(httpUriRequest);
+        Assert.assertEquals(HttpStatus.SC_OK, httpResponse.getStatusLine().getStatusCode());
+    }
+
+    private InetSocketAddress getNcEndPoint(String nodeId) {
+        if (ncEndPoints == null || !ncEndPoints.containsKey(nodeId)) {
+            throw new IllegalStateException("No end point specified for node: " + nodeId);
+        }
+        return ncEndPoints.get(nodeId);
+    }
+
+    private InetSocketAddress getNcReplicationAddress(String nodeId) {
+        if (replicationAddress == null || !replicationAddress.containsKey(nodeId)) {
+            throw new IllegalStateException("No replication address specified for node: " + nodeId);
+        }
+        return replicationAddress.get(nodeId);
+    }
+
     abstract static class TestLoop extends Exception {
 
         private final String target;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
new file mode 100644
index 0000000..56c7bc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationExecutionTest {
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final TestExecutor testExecutor = new TestExecutor();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+        Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+        for (NodeControllerService nc : ncs) {
+            final String nodeId = nc.getId();
+            final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+            int apiPort = appCtx.getExternalProperties().getNcApiPort();
+            int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+            replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+        }
+        testExecutor.setNcEndPoints(ncEndPoints);
+        testExecutor.setNcReplicationAddress(replicationAddress);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("replication.xml", "replication.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public ReplicationExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
new file mode 100644
index 0000000..d287fad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /admin/storage/partition/0
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
new file mode 100644
index 0000000..a635676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -0,0 +1,28 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+            QueryFileExtension=".sqlpp">
+  <test-group name="replication">
+    <test-case FilePath="replication">
+      <compilation-unit name="add_replica">
+        <output-dir compare="Text">add_replica</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
new file mode 100644
index 0000000..3553d9c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -0,0 +1,7 @@
+[ {
+  "partition" : 0,
+  "replicas" : [ {
+    "location" : "127.0.0.1:2017",
+    "status" : "DISCONNECTED"
+  } ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 0503c09..28be6fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -29,8 +29,7 @@
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -119,7 +118,7 @@
     @Override
     INCServiceContext getServiceContext();
 
-    IStorageSubsystem getStorageSubsystem();
-
     IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
+
+    IReplicaManager getReplicaManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
new file mode 100644
index 0000000..034d668
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+public class ReplicationException extends RuntimeException {
+
+    public ReplicationException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
new file mode 100644
index 0000000..5a9dc3f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public interface IPartitionReplica {
+
+    enum PartitionReplicaStatus {
+        /* replica is in-sync with master */
+        IN_SYNC,
+        /* replica is still catching up with master */
+        CATCHING_UP,
+        /* replica is not connected with master */
+        DISCONNECTED
+    }
+
+    /**
+     * Gets the status of a replica.
+     *
+     * @return The status
+     */
+    PartitionReplicaStatus getStatus();
+
+    /**
+     * Gets the identifier of a replica
+     *
+     * @return The identifier
+     */
+    ReplicaIdentifier getIdentifier();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
similarity index 90%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index b4f06cb..a3b2b50 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -21,7 +21,9 @@
 import java.util.List;
 import java.util.Set;
 
-public interface IStorageSubsystem {
+import org.apache.asterix.common.replication.IPartitionReplica;
+
+public interface IReplicaManager {
 
     /**
      * Adds a replica with the specified {@code id}
@@ -43,7 +45,7 @@
      * @param partition
      * @return The list of replicas
      */
-    List<PartitionReplica> getReplicas(int partition);
+    List<IPartitionReplica> getReplicas(int partition);
 
     /**
      * Gets the list of partition to which the current node is
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5f23bf..1ac3ffa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -46,4 +46,8 @@
 
     private Servlets() {
     }
+
+    public static String getAbsolutePath(String servlet) {
+        return servlet.replaceAll("/\\*$", "");
+    }
 }
diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml
index f209aae..2b5fe0c 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -71,6 +71,14 @@
       <artifactId>asterix-transactions</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
new file mode 100644
index 0000000..c6d1b60
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private final ReplicaIdentifier id;
+    private PartitionReplicaStatus status = DISCONNECTED;
+
+    public PartitionReplica(ReplicaIdentifier id) {
+        this.id = id;
+    }
+
+    @Override
+    public synchronized PartitionReplicaStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public ReplicaIdentifier getIdentifier() {
+        return id;
+    }
+
+    public synchronized void sync() {
+        if (status == IN_SYNC || status == CATCHING_UP) {
+            return;
+        }
+    }
+
+    public JsonNode asJson() {
+        ObjectNode json = OBJECT_MAPPER.createObjectNode();
+        json.put("id", id.toString());
+        json.put("state", status.name());
+        return json;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionReplica that = (PartitionReplica) o;
+        return id.equals(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONUtil.convertNode(asJson());
+        } catch (JsonProcessingException e) {
+            throw new ReplicationException(e);
+        }
+    }
+}
\ No newline at end of file