[NO ISSUE] Add support for NodeGroup upsert, etc.
Also, enable resource id generation when only pending removal nodes are absent
from the cluster
Change-Id: I15cfb74bc345680102cedafa99f7ff4f144860bc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2389
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index d239038..a3ad089 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -163,7 +163,7 @@
private void processPartitionMaster(IServletRequest request, IServletResponse response) {
final String partition = request.getParameter("partition");
final String node = request.getParameter("node");
- appCtx.getClusterStateManager().updateClusterPartition(Integer.valueOf(partition), node, true);
+ appCtx.getClusterStateManager().updateClusterPartition(Integer.parseInt(partition), node, true);
response.setStatus(HttpResponseStatus.OK);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0a5707e..dda9ffd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -66,7 +66,7 @@
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
*/
- void updateClusterPartition(Integer partitionNum, String activeNode, boolean active);
+ void updateClusterPartition(int partitionNum, String activeNode, boolean active);
/**
* Updates the metadata node id and its state.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index a12b079..b994c50 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -191,12 +190,9 @@
}
}
- public NodeGroup addNodeGroupIfNotExists(NodeGroup nodeGroup) {
+ public NodeGroup addOrUpdateNodeGroup(NodeGroup nodeGroup) {
synchronized (nodeGroups) {
- if (!nodeGroups.containsKey(nodeGroup.getNodeGroupName())) {
- return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
- }
- return null;
+ return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b2757f2..a5e3a84 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -57,6 +57,7 @@
import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -480,8 +481,18 @@
@Override
public void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException {
+ modifyNodegroup(ctx, nodeGroup, Operation.INSERT);
+ }
+
+ @Override
+ public void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException {
+ modifyNodegroup(ctx, nodeGroup, Operation.UPSERT);
+ }
+
+ public void modifyNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup, Operation op)
+ throws AlgebricksException {
try {
- metadataNode.addNodeGroup(ctx.getTxnId(), nodeGroup);
+ metadataNode.modifyNodeGroup(ctx.getTxnId(), nodeGroup, op);
} catch (RemoteException e) {
throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 966c99a..64d0389 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -404,11 +404,12 @@
}
@Override
- public void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException {
+ public void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup, Operation modificationOp)
+ throws AlgebricksException, RemoteException {
try {
NodeGroupTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getNodeGroupTupleTranslator(true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(nodeGroup);
- insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
+ modifyMetadataIndex(modificationOp, txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
} catch (HyracksDataException e) {
if (e.getComponent().equals(ErrorCode.HYRACKS) && e.getErrorCode() == ErrorCode.DUPLICATE_KEY) {
throw new AlgebricksException(
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index cb67ee5..961a1ee 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -152,7 +152,7 @@
public void dropNodeGroup(String nodeGroupName) {
NodeGroup nodeGroup = new NodeGroup(nodeGroupName, null);
- droppedCache.addNodeGroupIfNotExists(nodeGroup);
+ droppedCache.addOrUpdateNodeGroup(nodeGroup);
logAndApply(new MetadataLogicalOperation(nodeGroup, false));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index e030db3..ff349e1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -320,18 +320,29 @@
throws AlgebricksException;
/**
- * Inserts a node group.
+ * Inserts a new node group.
*
* @param ctx
* MetadataTransactionContext of an active metadata transaction.
* @param nodeGroup
* Node group instance to insert.
* @throws AlgebricksException
- * For example, if the node group already exists.
+ * For example, if the node group already exists
*/
void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException;
/**
+ * Inserts a new (or updates an existing) node group.
+ *
+ * @param ctx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param nodeGroup
+ * Node group instance to insert or update.
+ * @throws AlgebricksException
+ */
+ void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException;
+
+ /**
* Retrieves a node group.
*
* @param ctx
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index f6abc53..dc23db4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -41,6 +41,7 @@
import org.apache.asterix.metadata.entities.Library;
import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
/**
@@ -340,11 +341,14 @@
* A globally unique id for an active metadata transaction.
* @param nodeGroup
* Node group instance to insert.
+ * @param modificationOp
* @throws AlgebricksException
* For example, if the node group already exists.
* @throws RemoteException
*/
- void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException;
+ void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup,
+ AbstractIndexModificationOperationCallback.Operation modificationOp)
+ throws AlgebricksException, RemoteException;
/**
* Retrieves a node group, acquiring local locks on behalf of the given
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index e5088aa..8a7d5ec 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -50,7 +50,7 @@
@Override
public NodeGroup addToCache(MetadataCache cache) {
- return cache.addNodeGroupIfNotExists(this);
+ return cache.addOrUpdateNodeGroup(this);
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 3e172c7..087913f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -41,16 +41,16 @@
ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
- if (!clusterStateManager.isClusterActive()) {
- response.setResourceId(-1);
- response.setException(new Exception("Cannot generate global resource id when cluster is not active."));
- } else {
- IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- response.setResourceId(resourceIdManager.createResourceId());
- if (response.getResourceId() < 0) {
+ IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ response.setResourceId(resourceIdManager.createResourceId());
+ if (response.getResourceId() < 0) {
+ if (!(clusterStateManager.isClusterActive())) {
+ response.setException(
+ new Exception("Cannot generate global resource id when cluster is not active."));
+ } else {
response.setException(new Exception("One or more nodes has not reported max resource id."));
+ requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
}
- requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
}
broker.sendApplicationMessageToNC(response, src);
} catch (Exception e) {
@@ -60,7 +60,7 @@
private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
ICCMessageBroker broker) throws Exception {
- Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes();
+ Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true);
ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
for (String nodeId : getParticipantNodes) {
if (!resourceIdManager.reported(nodeId)) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 0bb862d..6d3077e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -38,7 +38,8 @@
@Override
public long createResourceId() {
- return csm.isClusterActive() ? globalResourceId.incrementAndGet() : -1;
+ return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true))
+ ? globalResourceId.incrementAndGet() : -1;
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 76668d2..03a6868 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -150,7 +150,7 @@
}
@Override
- public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) {
+ public synchronized void updateClusterPartition(int partitionNum, String activeNode, boolean active) {
ClusterPartition clusterPartition = clusterPartitions.get(partitionNum);
if (clusterPartition != null) {
// set the active node for this node's partitions
@@ -159,6 +159,7 @@
clusterPartition.setActiveNodeId(activeNode);
clusterPartition.setPendingActivation(false);
}
+ notifyAll();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8f73864..77e7bf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -101,7 +101,7 @@
// Updates the node registry.
if (nodeRegistry.containsKey(nodeId)) {
LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
- failNonDeadNode(nodeId);
+ failNode(nodeId);
} else {
try {
// TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
@@ -173,7 +173,7 @@
return Pair.of(deadNodes, affectedJobIds);
}
- private void failNonDeadNode(String nodeId) throws HyracksException {
+ public void failNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.