checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 9fbb24a..9e628c2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -568,14 +568,20 @@
if (allNodesNodegroup) {
nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
} else {
- Random random = new Random();
Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+ String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+ List<String> selectedNodes = new ArrayList<String>();
+ selectedNodes.add(metadataNodeName);
+ nodeNames.remove(metadataNodeName);
+ nodegroupCardinality--;
+
+ Random random = new Random();
String[] nodes = nodeNames.toArray(new String[] {});
int[] b = new int[nodeNames.size()];
for (int i = 0; i < b.length; i++) {
b[i] = i;
}
- List<String> selectedNodes = new ArrayList<String>();
+
for (int i = 0; i < nodegroupCardinality; i++) {
int selected = i + random.nextInt(nodeNames.size() - i);
int selNodeIndex = b[selected];
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 4f355f8..cebc710 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -14,16 +14,26 @@
*/
package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWorkResponse;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
@@ -31,15 +41,26 @@
private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+ private static final LinkedBlockingQueue<Set<IClusterManagementWork>> workRequestQueue = new LinkedBlockingQueue<Set<IClusterManagementWork>>();
+
+ private static ClusterWorkExecutor eventHandler = new ClusterWorkExecutor(workRequestQueue);
+
+ private static List<IClusterManagementWorkResponse> pendingWorkResponses = new ArrayList<IClusterManagementWorkResponse>();
+
public static ClusterLifecycleListener INSTANCE = new ClusterLifecycleListener();
private ClusterLifecycleListener() {
Thread t = new Thread(eventHandler);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting cluster event handler");
+ }
t.start();
}
- private static final LinkedBlockingQueue<Set<IClusterManagementWork>> outbox = new LinkedBlockingQueue<Set<IClusterManagementWork>>();
- private static ClusterEventHandler eventHandler = new ClusterEventHandler(outbox);
+ public enum ClusterEventType {
+ NODE_JOIN,
+ NODE_FAILURE
+ }
@Override
public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
@@ -47,6 +68,20 @@
LOGGER.info("NC: " + nodeId + " joined");
}
AsterixClusterProperties.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+ Set<String> nodeAddition = new HashSet<String>();
+ nodeAddition.add(nodeId);
+ updateProgress(ClusterEventType.NODE_JOIN, nodeAddition);
+ Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
+ Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+ for (IClusterEventsSubscriber sub : subscribers) {
+ Set<IClusterManagementWork> workRequest = sub.notifyNodeJoin(nodeId);
+ if (workRequest != null && !workRequest.isEmpty()) {
+ work.addAll(workRequest);
+ }
+ }
+ if (!work.isEmpty()) {
+ executeWorkSet(work);
+ }
}
@@ -57,19 +92,112 @@
}
AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
}
+ updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
for (IClusterEventsSubscriber sub : subscribers) {
- work.addAll(sub.notifyNodeFailure(deadNodeIds));
+ Set<IClusterManagementWork> workRequest = sub.notifyNodeFailure(deadNodeIds);
+ if (workRequest != null && !workRequest.isEmpty()) {
+ work.addAll(workRequest);
+ }
}
+ if (!work.isEmpty()) {
+ executeWorkSet(work);
+ }
+ }
+ private void submitWork(Set<IClusterManagementWork> work) {
try {
- outbox.put(work);
+ workRequestQueue.put(work);
} catch (InterruptedException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Interrupted :" + e.getMessage());
}
}
+ }
+
+ private void updateProgress(ClusterEventType eventType, Set<String> nodeIds) {
+ List<IClusterManagementWorkResponse> completedResponses = new ArrayList<IClusterManagementWorkResponse>();
+ boolean isComplete = false;
+ for (IClusterManagementWorkResponse resp : pendingWorkResponses) {
+ switch (eventType) {
+ case NODE_FAILURE:
+ isComplete = ((RemoveNodeWorkResponse) resp).updateProgress(nodeIds);
+ if (isComplete) {
+ resp.setStatus(Status.SUCCESS);
+ resp.getWork().getSourceSubscriber().notifyRequestCompletion(resp);
+ completedResponses.add(resp);
+ }
+ break;
+
+ case NODE_JOIN:
+ isComplete = ((AddNodeWorkResponse) resp).updateProgress(nodeIds.iterator().next());
+ if (isComplete) {
+ resp.setStatus(Status.SUCCESS);
+ resp.getWork().getSourceSubscriber().notifyRequestCompletion(resp);
+ completedResponses.add(resp);
+ }
+ break;
+ }
+ }
+ pendingWorkResponses.removeAll(completedResponses);
+ }
+
+ private void executeWorkSet(Set<IClusterManagementWork> workSet) {
+ int nodesToAdd = 0;
+ Set<String> nodesToRemove = new HashSet<String>();
+ Set<AddNodeWork> nodeAdditionRequests = new HashSet<AddNodeWork>();
+ Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+ for (IClusterManagementWork w : workSet) {
+ switch (w.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
+ nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+ }
+ nodeAdditionRequests.add((AddNodeWork) w);
+ break;
+ case REMOVE_NODE:
+ nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+ nodeRemovalRequests.add(w);
+ RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+ pendingWorkResponses.add(response);
+ break;
+ }
+ }
+
+ List<String> addedNodes = new ArrayList<String>();
+ String asterixInstanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
+ for (int i = 0; i < nodesToAdd; i++) {
+ Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+ if (node != null) {
+ try {
+ ClusterManager.INSTANCE.addNode(node);
+ addedNodes.add(asterixInstanceName + "_" + node.getId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Added NC at:" + node.getId());
+ }
+ } catch (AsterixException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC at:" + node.getId());
+ }
+ e.printStackTrace();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC: no more available nodes");
+ }
+ }
+ }
+
+ for (AddNodeWork w : nodeAdditionRequests) {
+ int n = w.getNumberOfNodes();
+ List<String> nodesToBeAddedForWork = new ArrayList<String>();
+ for (int i = 0; i < n; i++) {
+ nodesToBeAddedForWork.add(addedNodes.get(i));
+ }
+ AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+ pendingWorkResponses.add(response);
+ }
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
similarity index 91%
rename from asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
rename to asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 091cf4b..a6c1f8b 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -30,16 +30,17 @@
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-public class ClusterEventHandler implements Runnable {
+public class ClusterWorkExecutor implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(ClusterEventHandler.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(ClusterWorkExecutor.class.getName());
private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox;
- public ClusterEventHandler(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
+ public ClusterWorkExecutor(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
this.inbox = inbox;
}
-
+
+
@Override
public void run() {
while (true) {
@@ -86,11 +87,7 @@
}
}
}
-
- for (IClusterManagementWork w : nodeAdditionRequests) {
- w.getSourceSubscriber().notifyRequestCompletion(
- new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
- }
+
} catch (InterruptedException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index e02cbba..a008c1d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -16,6 +16,7 @@
package edu.uci.ics.asterix.metadata;
import java.rmi.RemoteException;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -659,4 +660,15 @@
return FeedPolicy;
}
+ @Override
+ public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException {
+ Collection<FeedActivity> feedActivities = null;
+ try {
+ feedActivities = metadataNode.getActiveFeeds(ctx.getJobId());
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ return feedActivities;
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 0398777..b180063 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,8 +17,11 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -37,7 +40,6 @@
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
import edu.uci.ics.asterix.metadata.api.IValueExtractor;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataSecondaryIndexes;
import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -1241,4 +1243,34 @@
throw new MetadataException(e);
}
}
+
+ @Override
+ public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException {
+ try {
+ ITupleReference searchKey = createTuple();
+ FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
+ List<FeedActivity> results = new ArrayList<FeedActivity>();
+ IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
+ tupleReaderWriter);
+ searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
+ Map<FeedId, FeedActivity> initiatedFeeds = new HashMap<FeedId, FeedActivity>();
+ FeedId fid = null;
+ for (FeedActivity fa : results) {
+ switch (fa.getActivityType()) {
+ case FEED_BEGIN:
+ fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+ initiatedFeeds.put(fid, fa);
+ break;
+ case FEED_FAILURE:
+ case FEED_END:
+ fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+ initiatedFeeds.remove(fid);
+ break;
+ }
+ }
+ return initiatedFeeds.values();
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
index 39f9927..049a45c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
@@ -17,6 +17,7 @@
import java.util.Set;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
public interface IClusterEventsSubscriber {
@@ -32,6 +33,15 @@
*/
public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+ /**
+ * @param response
+ */
public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+ /**
+ * @param previousState
+ * @param newState
+ */
+ public void notifyStateChange(State previousState, State newState);
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 5f1acf8..a7833d2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -16,6 +16,7 @@
package edu.uci.ics.asterix.metadata.api;
import java.rmi.RemoteException;
+import java.util.Collection;
import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -469,9 +470,23 @@
*/
public void addFeedPolicy(MetadataTransactionContext ctx, FeedPolicy policy) throws MetadataException;
+ /**
+ * @param ctx
+ * @param dataverse
+ * @param policyName
+ * @return
+ * @throws MetadataException
+ */
public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException;
+ /**
+ * @param ctx
+ * @return
+ * @throws MetadataException
+ */
+ public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException;
+
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
public int getMostRecentDatasetId() throws MetadataException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index cf5f41e..9ab483d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -18,6 +18,7 @@
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
+import java.util.Collection;
import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -524,4 +525,11 @@
public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
RemoteException;
+ /**
+ * @param jobId
+ * @return
+ * @throws MetadataException
+ * @throws RemoteException
+ */
+ public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
index fca9990..40999f0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
@@ -14,21 +14,28 @@
*/
package edu.uci.ics.asterix.metadata.cluster;
-import java.util.Set;
-
-import edu.uci.ics.asterix.event.schema.cluster.Node;
+import java.util.ArrayList;
+import java.util.List;
public class AddNodeWorkResponse extends ClusterManagementWorkResponse {
- private final Set<Node> nodesAdded;
+ private final List<String> nodesToBeAdded;
+ private final List<String> nodesAdded;
- public AddNodeWorkResponse(AddNodeWork w, Status status, Set<Node> nodesAdded) {
- super(w, status);
- this.nodesAdded = nodesAdded;
+ public AddNodeWorkResponse(AddNodeWork w, List<String> nodesToBeAdded) {
+ super(w);
+ this.nodesToBeAdded = nodesToBeAdded;
+ this.nodesAdded = new ArrayList<String>();
}
- public Set<Node> getNodesAdded() {
+ public List<String> getNodesAdded() {
return nodesAdded;
}
+ public boolean updateProgress(String addedNode) {
+ nodesToBeAdded.remove(addedNode);
+ nodesAdded.add(addedNode);
+ return nodesToBeAdded.isEmpty();
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
index 5970978..d578a77 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -6,11 +6,11 @@
protected final IClusterManagementWork work;
- protected final Status status;
+ protected Status status;
- public ClusterManagementWorkResponse(IClusterManagementWork w, Status status) {
+ public ClusterManagementWorkResponse(IClusterManagementWork w) {
this.work = w;
- this.status = status;
+ this.status = Status.IN_PROGRESS;
}
@Override
@@ -23,4 +23,9 @@
return status;
}
+ @Override
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
index 1ac9e34..dfc88ac 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
@@ -5,6 +5,7 @@
public interface IClusterManagementWorkResponse {
public enum Status {
+ IN_PROGRESS,
SUCCESS,
FAILURE
}
@@ -19,4 +20,9 @@
*/
public Status getStatus();
+ /**
+ * @param status
+ */
+ public void setStatus(Status status);
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
index 48445ef..58ea05e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
@@ -14,19 +14,21 @@
*/
package edu.uci.ics.asterix.metadata.cluster;
+import java.util.HashSet;
import java.util.Set;
public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse {
- private final Set<String> nodesRemoved;
+ private Set<String> nodesToBeRemoved = new HashSet<String>();
- public RemoveNodeWorkResponse(AddNodeWork w, Status status, Set<String> nodesRemoved) {
- super(w, status);
- this.nodesRemoved = nodesRemoved;
+ public RemoveNodeWorkResponse(RemoveNodeWork w, Status status) {
+ super(w);
+ nodesToBeRemoved.addAll(w.getNodesToBeRemoved());
}
- public Set<String> getNodesAdded() {
- return nodesRemoved;
- }
+ public boolean updateProgress(Set<String> failedNodeIds) {
+ nodesToBeRemoved.removeAll(failedNodeIds);
+ return nodesToBeRemoved.isEmpty();
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
index 257c8fd..c436fde 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
@@ -24,40 +24,30 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -79,21 +69,21 @@
public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
private LinkedBlockingQueue<Message> jobEventInbox;
- private LinkedBlockingQueue<FeedFailureReport> failureEventInbox;
- private Map<Integer, FeedFailureReport> feedsWaitingForResponse = new HashMap<Integer, FeedFailureReport>();
+ private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
private FeedLifecycleListener() {
jobEventInbox = new LinkedBlockingQueue<Message>();
feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
- failureEventInbox = new LinkedBlockingQueue<FeedFailureReport>();
- feedFailureNotificationHandler = new FeedFailureHandler(failureEventInbox);
+ responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
+ feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
+
new Thread(feedJobNotificationHandler).start();
- new Thread(feedFailureNotificationHandler).start();
+ new Thread(feedWorkRequestResponseHandler).start();
ClusterManager.INSTANCE.registerSubscriber(this);
}
private final FeedJobNotificationHandler feedJobNotificationHandler;
- private final FeedFailureHandler feedFailureNotificationHandler;
+ private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
@Override
public void notifyJobStart(JobId jobId) throws HyracksException {
@@ -392,7 +382,7 @@
AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
work.add(addNodesWork);
- feedsWaitingForResponse.put(addNodesWork.getWorkId(), failureReport);
+ feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
return work;
}
@@ -421,133 +411,35 @@
@Override
public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- IClusterManagementWork submittedWork = response.getWork();
- switch (submittedWork.getClusterManagementWorkType()) {
- case ADD_NODE:
- AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
- switch (resp.getStatus()) {
- case FAILURE:
- break;
- case SUCCESS:
- AddNodeWork work = (AddNodeWork) submittedWork;
- FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
- Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
- for (FeedInfo feedInfo : affectedFeeds) {
- try {
- recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Recovered feed:" + feedInfo);
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to recover feed:" + feedInfo);
- }
- }
- }
- break;
- }
- resp.getNodesAdded();
- break;
- case REMOVE_NODE:
- break;
+ try {
+ responseInbox.put(response);
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted exception");
+ }
}
}
- private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
- throws Exception {
- for (FeedFailure feedFailure : feedFailures) {
- switch (feedFailure.failureType) {
- case INGESTION_NODE:
- alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
- break;
- }
- }
- JobSpecification spec = feedInfo.jobSpec;
- AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+ @Override
+ public void notifyStateChange(State previousState, State newState) {
+ switch(newState){
+ case ACTIVE:
+ if(previousState.equals((State.UNUSABLE)){
+ resumeActiveFeeds();
+ }
+ break;
+ }
+
}
- private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
- Random r = new Random();
- Object[] rnodes = resp.getNodesAdded().toArray();
- Node replacementNode = (Node) rnodes[r.nextInt(rnodes.length)];
- String replacementNodeId = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName() + "_"
- + replacementNode.getId();
- Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
- Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
- List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
- List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
- List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
- Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
- Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
- OperatorDescriptorId opId = null;
- for (Constraint constraint : userConstraints) {
- LValueConstraintExpression lexpr = constraint.getLValue();
- ConstraintExpression cexpr = constraint.getRValue();
- switch (lexpr.getTag()) {
- case PARTITION_COUNT:
- opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- if (modifiedOperators.contains(opId)) {
- countConstraintsToReplace.add(constraint);
- } else {
- List<Constraint> clist = candidateConstraints.get(opId);
- if (clist == null) {
- clist = new ArrayList<Constraint>();
- candidateConstraints.put(opId, clist);
- }
- clist.add(constraint);
- }
- break;
- case PARTITION_LOCATION:
- opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
- String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
- if (oldLocation.equals(failedNodeId)) {
- locationConstraintsToReplace.add(constraint);
- modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
- List<String> newLocs = newConstraints.get(opId);
- if (newLocs == null) {
- newLocs = new ArrayList<String>();
- newConstraints.put(opId, newLocs);
- }
- newLocs.add(replacementNodeId);
- } else {
- if (modifiedOperators.contains(opId)) {
- locationConstraintsToReplace.add(constraint);
- List<String> newLocs = newConstraints.get(opId);
- if (newLocs == null) {
- newLocs = new ArrayList<String>();
- newConstraints.put(opId, newLocs);
- }
- newLocs.add(oldLocation);
- } else {
- List<Constraint> clist = candidateConstraints.get(opId);
- if (clist == null) {
- clist = new ArrayList<Constraint>();
- candidateConstraints.put(opId, clist);
- }
- clist.add(constraint);
- }
- }
- break;
- }
+ private void resumeActiveFeeds() {
+ MetadataTransactionContext ctx = null;
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ MetadataManager.INSTANCE.getActiveFeeds(ctx);
+ } catch (Exception e) {
+
}
-
- feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
- feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
-
- for (OperatorDescriptorId mopId : modifiedOperators) {
- List<Constraint> clist = candidateConstraints.get(mopId);
- if (clist != null && !clist.isEmpty()) {
- feedInfo.jobSpec.getUserConstraints().removeAll(clist);
- }
- }
-
- for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
- OperatorDescriptorId nopId = entry.getKey();
- List<String> clist = entry.getValue();
- IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
- clist.toArray(new String[] {}));
- }
-
}
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..8f9beb7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,188 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailureReport;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedInfo;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+ private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+ private Map<Integer, FeedFailureReport> feedsWaitingForResponse = new HashMap<Integer, FeedFailureReport>();
+
+ public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+ this.inbox = inbox;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ IClusterManagementWorkResponse response = null;
+ try {
+ response = inbox.take();
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted exception " + e.getMessage());
+ }
+ }
+ IClusterManagementWork submittedWork = response.getWork();
+ switch (submittedWork.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+ switch (resp.getStatus()) {
+ case FAILURE:
+ break;
+ case SUCCESS:
+ AddNodeWork work = (AddNodeWork) submittedWork;
+ FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
+ Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
+ for (FeedInfo feedInfo : affectedFeeds) {
+ try {
+ recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Recovered feed:" + feedInfo);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to recover feed:" + feedInfo);
+ }
+ }
+ }
+ break;
+ }
+ resp.getNodesAdded();
+ break;
+ case REMOVE_NODE:
+ break;
+ }
+ }
+ }
+
+ private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
+ throws Exception {
+ for (FeedFailure feedFailure : feedFailures) {
+ switch (feedFailure.failureType) {
+ case INGESTION_NODE:
+ alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
+ break;
+ }
+ }
+ JobSpecification spec = feedInfo.jobSpec;
+ //AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+ }
+
+ private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
+ Random r = new Random();
+ String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
+ String replacementNode = rnodes[r.nextInt(rnodes.length)];
+ Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
+ Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
+ List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+ List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+ List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+ Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
+ OperatorDescriptorId opId = null;
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (modifiedOperators.contains(opId)) {
+ countConstraintsToReplace.add(constraint);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ if (oldLocation.equals(failedNodeId)) {
+ locationConstraintsToReplace.add(constraint);
+ modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+ List<String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new ArrayList<String>();
+ newConstraints.put(opId, newLocs);
+ }
+ newLocs.add(replacementNode);
+ } else {
+ if (modifiedOperators.contains(opId)) {
+ locationConstraintsToReplace.add(constraint);
+ List<String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new ArrayList<String>();
+ newConstraints.put(opId, newLocs);
+ }
+ newLocs.add(oldLocation);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ }
+ break;
+ }
+ }
+
+ feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+ feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+ for (OperatorDescriptorId mopId : modifiedOperators) {
+ List<Constraint> clist = candidateConstraints.get(mopId);
+ if (clist != null && !clist.isEmpty()) {
+ feedInfo.jobSpec.getUserConstraints().removeAll(clist);
+ }
+ }
+
+ for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
+ OperatorDescriptorId nopId = entry.getKey();
+ List<String> clist = entry.getValue();
+ IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
+ clist.toArray(new String[] {}));
+ }
+
+ }
+
+ public void registerFeedWork(int workId, FeedFailureReport failureReport) {
+ feedsWaitingForResponse.put(workId, failureReport);
+ }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 29e3b3b..f764954 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -69,7 +69,7 @@
private State state = State.UNUSABLE;
public void removeNCConfiguration(String nodeId) {
- state = State.UNUSABLE;
+ // state = State.UNUSABLE;
ncConfiguration.remove(nodeId);
}
@@ -115,7 +115,7 @@
public Node getAvailableSubstitutionNode() {
List<Node> subNodes = cluster.getSubstituteNodes().getNode();
- return subNodes.isEmpty() ? null : subNodes.remove(0);
+ return subNodes.isEmpty() ? null : subNodes.get(0);
}
}