checkpoint
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index d7c65c8..ab3516a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -31,7 +31,6 @@
import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceFeedInterceptOperatorRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
@@ -210,7 +209,6 @@
accessMethod.add(new RemoveUnusedOneToOneEquiJoinRule());
accessMethod.add(new PushSimilarityFunctionsBelowJoin());
accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
- accessMethod.add(new IntroduceFeedInterceptOperatorRule());
return accessMethod;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index 2953e42..091d1c1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -180,53 +180,6 @@
return true;
}
- if (fid.equals(AsterixBuiltinFunctions.FEED_INTERCEPT)) {
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("No positional variables are allowed over datasets.");
- }
- ILogicalExpression expr = f.getArguments().get(0).getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
- return false;
- }
- ConstantExpression ce = (ConstantExpression) expr;
- IAlgebricksConstantValue acv = ce.getValue();
- if (!(acv instanceof AsterixConstantValue)) {
- return false;
- }
- AsterixConstantValue acv2 = (AsterixConstantValue) acv;
- if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
- return false;
- }
- String datasetArg = ((AString) acv2.getObject()).getStringValue();
-
- AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
- Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
- String dataverseName = datasetReference.first;
- String datasetName = datasetReference.second;
- Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Could not find dataset " + datasetName);
- }
-
- if (dataset.getDatasetType() != DatasetType.FEED) {
- throw new IllegalArgumentException("invalid dataset type:" + dataset.getDatasetType());
- }
-
- AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
- ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
- v.add(unnest.getVariable());
-
- DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedInterceptDataSource(asid,
- dataset, metadataProvider));
-
- List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
- scanInpList.addAll(unnest.getInputs());
- opRef.setValue(scan);
- addPrimaryKey(v, context);
- context.computeAndSetTypeEnvironmentForOperator(scan);
-
- return true;
- }
}
return false;
@@ -256,20 +209,6 @@
return extDataSource;
}
- private AqlDataSource createDummyFeedInterceptDataSource(AqlSourceId aqlId, Dataset dataset,
- AqlMetadataProvider metadataProvider) throws AlgebricksException {
- if (!aqlId.getDataverseName().equals(
- metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
- .getDataverseName())) {
- return null;
- }
- String tName = dataset.getItemTypeName();
- IAType itemType = metadataProvider.findType(dataset.getDataverseName(), tName);
- ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
- AqlDataSource.AqlDataSourceType.FEED_INTERCEPT);
- return extDataSource;
- }
-
private Pair<String, String> parseDatasetReference(AqlMetadataProvider metadataProvider, String datasetArg)
throws AlgebricksException {
String[] datasetNameComponents = datasetArg.split("\\.");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 921e93e..af53e51 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -19,7 +19,6 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -68,12 +67,9 @@
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
-import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -840,7 +836,7 @@
}
- private static class FeedsDeActivator implements Runnable {
+ public static class FeedsDeActivator implements Runnable {
private List<FeedInfo> feedsToTerminate;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index 6eaf603..f7622be 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.hyracks.bootstrap;
import java.util.ArrayList;
@@ -14,19 +28,21 @@
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailureReport;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedInfo;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedsDeActivator;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -103,7 +119,7 @@
}
}
JobSpecification spec = feedInfo.jobSpec;
- System.out.println("ALTERED Job Spec" + spec);
+ System.out.println("Altered Job Spec \n" + spec);
Thread.sleep(3000);
AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
}
@@ -112,18 +128,39 @@
String replacementNode = null;
switch (resp.getStatus()) {
case FAILURE:
- boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
- .size() > 1);
- if (computeNodeSubstitute) {
- feedInfo.computeLocations.remove(failedNodeId);
- replacementNode = feedInfo.computeLocations.get(0);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
+ // TODO 1st preference is given to any other participant node that is not involved in the feed.
+ // 2nd preference is given to a compute node.
+ // 3rd preference is given to a storage node
+ Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+ if (participantNodes != null && !participantNodes.isEmpty()) {
+ participantNodes.removeAll(feedInfo.storageLocations);
+ participantNodes.removeAll(feedInfo.computeLocations);
+ if (!participantNodes.isEmpty()) {
+ String[] participantNodesArray = AsterixClusterProperties.INSTANCE.getParticipantNodes()
+ .toArray(new String[] {});
+
+ replacementNode = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Participant Node: " + replacementNode + " chosen as replacement for "
+ + failedNodeId);
+ }
}
- } else {
- replacementNode = feedInfo.storageLocations.get(0);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
+
+ if (replacementNode == null) {
+ boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
+ .size() > 1);
+ if (computeNodeSubstitute) {
+ feedInfo.computeLocations.remove(failedNodeId);
+ replacementNode = feedInfo.computeLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
+ } else {
+ replacementNode = feedInfo.storageLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
}
}
break;
@@ -137,11 +174,22 @@
break;
}
- replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+
+ if (replacementNode == null) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to find replacement for failed node :" + failedNodeId);
+ LOGGER.severe("Feed: " + feedInfo.feedId + " will be terminated");
+ }
+ List<FeedInfo> feedsToTerminate = new ArrayList<FeedInfo>();
+ feedsToTerminate.add(feedInfo);
+ Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
+ t.start();
+ } else {
+ replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+ }
}
private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
- Map<OperatorDescriptorId, IOperatorDescriptor> opMap = jobSpec.getOperatorMap();
Set<Constraint> userConstraints = jobSpec.getUserConstraints();
List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index 68d03bf..2129862 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -404,7 +404,14 @@
}
private static void replaceInJar(File sourceJar, String origFile, File replacementFile) throws IOException {
- File destJar = new File(sourceJar.getAbsolutePath() + ".modified");
+ String srcJarAbsPath = sourceJar.getAbsolutePath();
+ String srcJarSuffix = srcJarAbsPath.substring(srcJarAbsPath.lastIndexOf(File.separator) + 1);
+ String srcJarName = srcJarSuffix.split(".jar")[0];
+
+ String destJarName = srcJarName + "-managix";
+ String destJarSuffix = destJarName + ".jar";
+ File destJar = new File(sourceJar.getParentFile().getAbsolutePath() + File.separator + destJarSuffix);
+ // File destJar = new File(sourceJar.getAbsolutePath() + ".modified");
InputStream jarIs = null;
FileInputStream fis = new FileInputStream(replacementFile);
JarFile sourceJarFile = new JarFile(sourceJar);
@@ -434,7 +441,7 @@
jarIs.close();
sourceJar.delete();
destJar.renameTo(sourceJar);
- sourceJar.setExecutable(true);
+ destJar.setExecutable(true);
}
public static void dumpToFile(String dest, String content) throws IOException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index 5fee34f..2e2c432 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.cluster;
import java.io.File;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index f4b7f27..077b3f1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -96,12 +96,15 @@
try {
switch (dataset.getDatasetType()) {
case FEED:
+ datasourceType = AqlDataSourceType.FEED;
initFeedDataset(itemType, dataset);
break;
case INTERNAL:
+ datasourceType = AqlDataSourceType.INTERNAL;
initInternalDataset(itemType);
break;
case EXTERNAL: {
+ datasourceType = AqlDataSourceType.EXTERNAL;
initExternalDataset(itemType);
break;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b0ddbbf..bbaa0a7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -62,7 +62,6 @@
import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedInterceptScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 81fd78d..2fcd9bf 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -26,8 +27,6 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
-import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -41,13 +40,13 @@
private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
- private static final String IO_DEVICES = "iodevices";
-
public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
+ public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
+
+ private static final String IO_DEVICES = "iodevices";
private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
- public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private final Cluster cluster;
private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
@@ -75,13 +74,13 @@
private State state = State.UNUSABLE;
- public void removeNCConfiguration(String nodeId) {
+ public synchronized void removeNCConfiguration(String nodeId) {
// state = State.UNUSABLE;
ncConfiguration.remove(nodeId);
resetClusterPartitionConstraint();
}
- public void addNCConfiguration(String nodeId, Map<String, String> configuration) {
+ public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
ncConfiguration.put(nodeId, configuration);
if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
.getNodeNames().size()) {
@@ -102,7 +101,7 @@
* is not valid if it does not correspond to the set of registered
* Node Controllers.
*/
- public int getNumberOfIODevices(String nodeId) {
+ public synchronized int getNumberOfIODevices(String nodeId) {
Map<String, String> ncConfig = ncConfiguration.get(nodeId);
if (ncConfig == null) {
if (LOGGER.isLoggable(Level.WARNING)) {
@@ -127,14 +126,18 @@
return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
}
- public AlgebricksPartitionConstraint getClusterLocations() {
+ public synchronized Set<String> getParticipantNodes() {
+ return ncConfiguration.keySet();
+ }
+
+ public synchronized AlgebricksPartitionConstraint getClusterLocations() {
if (clusterPartitionConstraint == null) {
resetClusterPartitionConstraint();
}
return clusterPartitionConstraint;
}
- private void resetClusterPartitionConstraint() {
+ private synchronized void resetClusterPartitionConstraint() {
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
ArrayList<String> locs = new ArrayList<String>();
for (String i : stores.keySet()) {
@@ -150,4 +153,65 @@
cluster = locs.toArray(cluster);
clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
}
+
+ private static class AsterixCluster {
+
+ private final String asterixInstance;
+ private Map<String, AsterixNode> asterixNodes;
+
+ public AsterixCluster(Cluster cluster) {
+ asterixInstance = cluster.getInstanceName();
+ asterixNodes = new HashMap<String, AsterixNode>();
+ for (Node node : cluster.getNode()) {
+ AsterixNode aNode = new AsterixNode(node, AsterixNode.NodeRole.PARTICIPANT,
+ AsterixNode.NodeState.INACTIVE);
+ asterixNodes.put(asterixInstance + "_" + node.getId(), aNode);
+ }
+
+ for (Node node : cluster.getSubstituteNodes().getNode()) {
+ AsterixNode aNode = new AsterixNode(node, AsterixNode.NodeRole.SUBSTITUTE,
+ AsterixNode.NodeState.INACTIVE);
+ asterixNodes.put(asterixInstance + "_" + node.getId(), aNode);
+ }
+ }
+
+ private static class AsterixNode {
+
+ private final Node node;
+ private NodeRole role;
+ private NodeState state;
+
+ public enum NodeRole {
+ PARTICIPANT,
+ SUBSTITUTE
+ }
+
+ public enum NodeState {
+ ACTIVE,
+ INACTIVE
+ }
+
+ public AsterixNode(Node node, NodeRole role, NodeState state) {
+ this.node = node;
+ this.role = role;
+ this.state = state;
+ }
+
+ @Override
+ public String toString() {
+ return node.getId() + "_" + role + "_" + state;
+ }
+ }
+
+ public void notifyChangeState(String nodeId, AsterixNode.NodeRole newRole, AsterixNode.NodeState newState) {
+ AsterixNode node = asterixNodes.get(nodeId);
+ if (node != null) {
+ node.role = newRole;
+ node.state = newState;
+ } else {
+ throw new IllegalStateException("Unknown nodeId" + nodeId);
+ }
+
+ }
+ }
}