modified strategy to choose a substitute node in event of failure
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index a89b6d5..6985019 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -74,7 +74,7 @@
LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
String report = null;
try {
- report = queue.poll(5, TimeUnit.SECONDS);
+ report = queue.poll(25, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 923e841..37c6e8a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -81,6 +81,10 @@
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -258,7 +262,21 @@
if (registeredFeeds.containsKey(jobId)) {
throw new IllegalStateException(" Feed already registered ");
}
- registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy));
+ registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy, jobId));
+ }
+
+ public void deregisterFeed(JobId jobId) {
+ FeedInfo feedInfo = registeredFeeds.remove(jobId);
+ if (feedInfo != null) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("DeRegistered Feed Info :" + feedInfo);
+ }
+ }
+ }
+
+ public void deregisterFeed(FeedInfo feedInfo) {
+ JobId jobId = feedInfo.jobId;
+ deregisterFeed(jobId);
}
@Override
@@ -280,6 +298,7 @@
LOGGER.info("Job finished for feed id" + feedInfo.feedConnectionId);
}
handleJobFinishMessage(feedInfo, mesg);
+ deregisterFeed(mesg.jobId);
break;
}
} catch (InterruptedException e) {
@@ -429,41 +448,90 @@
private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
- try {
- IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
- JobInfo info = hcc.getJobInfo(message.jobId);
- JobStatus status = info.getPendingStatus();
- List<Exception> exceptions;
- boolean failure = status != null && status.equals(JobStatus.FAILURE);
- FeedActivityType activityType = FeedActivityType.FEED_END;
- Map<String, String> details = new HashMap<String, String>();
- if (failure) {
- exceptions = info.getPendingExceptions();
- activityType = FeedActivityType.FEED_FAILURE;
- details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
- }
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
- feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
- activityType, details);
- MetadataManager.INSTANCE.registerFeedActivity(
- mdTxnCtx,
- new FeedConnectionId(feedInfo.feedConnectionId.getDataverse(), feedInfo.feedConnectionId
- .getFeedName(), feedInfo.feedConnectionId.getDatasetName()), feedActivity);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (RemoteException | ACIDException | MetadataException e) {
+ boolean feedFailedDueToPostSubmissionNodeLoss = verfyReasonForFailure(feedInfo);
+ if (!feedFailedDueToPostSubmissionNodeLoss) {
try {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- } catch (RemoteException | ACIDException ae) {
- throw new IllegalStateException(" Unable to abort ");
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(message.jobId);
+ JobStatus status = info.getPendingStatus();
+ List<Exception> exceptions;
+ boolean failure = status != null && status.equals(JobStatus.FAILURE);
+ FeedActivityType activityType = FeedActivityType.FEED_END;
+ Map<String, String> details = new HashMap<String, String>();
+ if (failure) {
+ exceptions = info.getPendingExceptions();
+ activityType = FeedActivityType.FEED_FAILURE;
+ details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
+ }
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
+ feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
+ activityType, details);
+ MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedConnectionId(
+ feedInfo.feedConnectionId.getDataverse(), feedInfo.feedConnectionId.getFeedName(),
+ feedInfo.feedConnectionId.getDatasetName()), feedActivity);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (RemoteException | ACIDException | MetadataException e) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ } catch (RemoteException | ACIDException ae) {
+ throw new IllegalStateException(" Unable to abort ");
+ }
+ } catch (Exception e) {
+ // add exception handling here
+ } finally {
+ MetadataManager.INSTANCE.releaseWriteLatch();
}
- } catch (Exception e) {
- // add exception handling here
- } finally {
- MetadataManager.INSTANCE.releaseWriteLatch();
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Attempt to revive feed");
+ }
+ FeedsActivator activator = new FeedsActivator();
+ String dataverse = feedInfo.feedConnectionId.getDataverse();
+ String datasetName = feedInfo.feedConnectionId.getDatasetName();
+ String feedName = feedInfo.feedConnectionId.getFeedName();
+ String feedPolicy = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ activator.reviveFeed(dataverse, feedName, datasetName, feedPolicy);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Revived Feed");
+ }
+
}
}
+ private boolean verfyReasonForFailure(FeedInfo feedInfo) {
+ JobSpecification spec = feedInfo.jobSpec;
+ Set<Constraint> userConstraints = spec.getUserConstraints();
+ List<String> locations = new ArrayList<String>();
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_LOCATION:
+ String location = (String) ((ConstantExpression) cexpr).getValue();
+ locations.add(location);
+ break;
+ }
+ }
+ Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+ List<String> nodesFailedPostSubmission = new ArrayList<String>();
+ for (String location : locations) {
+ if (!participantNodes.contains(location)) {
+ nodesFailedPostSubmission.add(location);
+ }
+ }
+
+ if (nodesFailedPostSubmission.size() > 0) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Feed failed as nodes failed post submission");
+ }
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
public static class FeedMessengerMessage {
private final IFeedMessage message;
private final FeedInfo feedInfo;
@@ -522,11 +590,13 @@
public List<String> storageLocations = new ArrayList<String>();
public JobInfo jobInfo;
public Map<String, String> feedPolicy;
+ public JobId jobId;
- public FeedInfo(FeedConnectionId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
+ public FeedInfo(FeedConnectionId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy, JobId jobId) {
this.feedConnectionId = feedId;
this.jobSpec = jobSpec;
this.feedPolicy = feedPolicy;
+ this.jobId = jobId;
}
@Override
@@ -542,6 +612,11 @@
return feedConnectionId.hashCode();
}
+ @Override
+ public String toString() {
+ return feedConnectionId + " job id " + jobId;
+ }
+
}
@Override
@@ -556,6 +631,9 @@
failures = new ArrayList<FeedFailure>();
failureReport.failures.put(feedInfo, failures);
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inestion Node Failure! " + deadNodeId);
+ }
failures.add(new FeedFailure(FeedFailure.FailureType.INGESTION_NODE, deadNodeId));
}
if (feedInfo.computeLocations.contains(deadNodeId)) {
@@ -564,6 +642,9 @@
failures = new ArrayList<FeedFailure>();
failureReport.failures.put(feedInfo, failures);
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Compute Node Failure! " + deadNodeId);
+ }
failures.add(new FeedFailure(FeedFailure.FailureType.COMPUTE_NODE, deadNodeId));
}
if (feedInfo.storageLocations.contains(deadNodeId)) {
@@ -572,6 +653,9 @@
failures = new ArrayList<FeedFailure>();
failureReport.failures.put(feedInfo, failures);
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Storage Node Failure! " + deadNodeId);
+ }
failures.add(new FeedFailure(FeedFailure.FailureType.STORAGE_NODE, deadNodeId));
}
}
@@ -596,6 +680,7 @@
builder.append("\n");
for (FeedInfo fInfo : failureReport.failures.keySet()) {
builder.append(fInfo.feedConnectionId);
+ feedJobNotificationHandler.deregisterFeed(fInfo);
}
LOGGER.warning(builder.toString());
}
@@ -699,6 +784,17 @@
}
AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
work.add(addNodesWork);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ Map<FeedInfo, List<FeedFailure>> feedFailures = failureReport.failures;
+ for (Entry<FeedInfo, List<FeedFailure>> entry : feedFailures.entrySet()) {
+ for (FeedFailure f : entry.getValue()) {
+ LOGGER.info("Feed Failure! " + f.failureType + " " + f.nodeId);
+ }
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered work id: " + addNodesWork.getWorkId());
+ }
feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -992,7 +1088,7 @@
}
}
- private void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+ public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
PrintWriter writer = new PrintWriter(System.out, true);
SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, true, false);
try {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index e88bff8..3197bbb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -92,7 +93,7 @@
Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
for (FeedInfo feedInfo : affectedFeeds) {
try {
- recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+ recoverFeed(feedInfo, work, resp, failureReport.failures.get(feedInfo));
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Recovered feed:" + feedInfo);
}
@@ -109,73 +110,124 @@
}
}
- private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
- throws Exception {
+ private void recoverFeed(FeedInfo feedInfo, AddNodeWork work, AddNodeWorkResponse resp,
+ List<FeedFailure> feedFailures) throws Exception {
+ List<String> failedNodeIds = new ArrayList<String>();
for (FeedFailure feedFailure : feedFailures) {
- switch (feedFailure.failureType) {
- case INGESTION_NODE:
- alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
- break;
- }
+ failedNodeIds.add(feedFailure.nodeId);
}
- JobSpecification spec = feedInfo.jobSpec;
- System.out.println("Altered Job Spec \n" + spec);
- Thread.sleep(5000);
- AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
- }
-
- private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
- String replacementNode = null;
+ List<String> chosenReplacements = new ArrayList<String>();
+ String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+ chosenReplacements.add(metadataNodeName);
switch (resp.getStatus()) {
case FAILURE:
- // TODO 1st preference is given to any other participant node that is not involved in the feed.
- // 2nd preference is given to a compute node.
- // 3rd preference is given to a storage node
- Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
- if (participantNodes != null && !participantNodes.isEmpty()) {
- participantNodes.removeAll(feedInfo.storageLocations);
- participantNodes.removeAll(feedInfo.computeLocations);
- if (!participantNodes.isEmpty()) {
- String[] participantNodesArray = AsterixClusterProperties.INSTANCE.getParticipantNodes()
- .toArray(new String[] {});
-
- replacementNode = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Participant Node: " + replacementNode + " chosen as replacement for "
- + failedNodeId);
- }
- }
- }
-
- if (replacementNode == null) {
- boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
- .size() > 1);
- if (computeNodeSubstitute) {
- feedInfo.computeLocations.remove(failedNodeId);
- replacementNode = feedInfo.computeLocations.get(0);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
- }
- } else {
- replacementNode = feedInfo.storageLocations.get(0);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
- }
+ for (FeedFailure feedFailure : feedFailures) {
+ switch (feedFailure.failureType) {
+ case INGESTION_NODE:
+ String replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
+ chosenReplacements);
+ chosenReplacements.add(replacement);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Existing node:" + replacement + " chosen to replace "
+ + feedFailure.nodeId);
+ }
+ alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
+ break;
}
}
break;
case SUCCESS:
- Random r = new Random();
- String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
- replacementNode = rnodes[r.nextInt(rnodes.length)];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Newly added node:" + replacementNode + " chosen to replace " + failedNodeId);
+ List<String> nodesAdded = resp.getNodesAdded();
+ int numNodesAdded = nodesAdded.size();
+ int nodeIndex = 0;
+ for (FeedFailure feedFailure : feedFailures) {
+ switch (feedFailure.failureType) {
+ case INGESTION_NODE:
+ String replacement = null;
+ if (nodeIndex <= numNodesAdded - 1) {
+ replacement = nodesAdded.get(nodeIndex);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Newly added node:" + replacement + " chosen to replace "
+ + feedFailure.nodeId);
+ }
+ } else {
+ replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
+ chosenReplacements);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Existing node:" + replacement + " chosen to replace "
+ + feedFailure.nodeId);
+ }
+ chosenReplacements.add(replacement);
+ }
+ alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
+ nodeIndex++;
+ break;
+ default: // ingestion nodes and compute nodes (in currrent implementation) coincide.
+ // so correcting ingestion node failure also takes care of compute nodes failure.
+ // Storage node failures cannot be recovered from as in current implementation, we
+ // do not have data replication.
+ }
}
-
break;
}
- if (replacementNode == null) {
+ JobSpecification spec = feedInfo.jobSpec;
+ System.out.println("Final recovery Job Spec \n" + spec);
+ Thread.sleep(5000);
+ AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+ }
+
+ private String getInternalReplacement(FeedInfo feedInfo, FeedFailure feedFailure, List<String> failedNodeIds,
+ List<String> chosenReplacements) {
+ String failedNodeId = feedFailure.nodeId;
+ String replacement = null;;
+ // TODO 1st preference is given to any other participant node that is not involved in the feed.
+ // 2nd preference is given to a compute node.
+ // 3rd preference is given to a storage node
+ Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+ if (participantNodes != null && !participantNodes.isEmpty()) {
+ List<String> pNodesClone = new ArrayList<String>();
+ pNodesClone.addAll(participantNodes);
+ pNodesClone.removeAll(feedInfo.storageLocations);
+ pNodesClone.removeAll(feedInfo.computeLocations);
+ pNodesClone.removeAll(feedInfo.ingestLocations);
+ pNodesClone.removeAll(chosenReplacements);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ for (String candidateNode : pNodesClone) {
+ LOGGER.info("Candidate for replacement:" + candidateNode);
+ }
+ }
+ if (!pNodesClone.isEmpty()) {
+ String[] participantNodesArray = pNodesClone.toArray(new String[] {});
+
+ replacement = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Participant Node: " + replacement + " chosen as replacement for " + failedNodeId);
+ }
+ }
+ }
+
+ if (replacement == null) {
+ feedInfo.computeLocations.removeAll(failedNodeIds);
+ boolean computeNodeSubstitute = (feedInfo.computeLocations.size() > 1);
+ if (computeNodeSubstitute) {
+ replacement = feedInfo.computeLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Compute node:" + replacement + " chosen to replace " + failedNodeId);
+ }
+ } else {
+ replacement = feedInfo.storageLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Storage node:" + replacement + " chosen to replace " + failedNodeId);
+ }
+ }
+ }
+ return replacement;
+ }
+
+ private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId, String replacement) {
+ if (replacement == null) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Unable to find replacement for failed node :" + failedNodeId);
LOGGER.severe("Feed: " + feedInfo.feedConnectionId + " will be terminated");
@@ -185,7 +237,7 @@
Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
t.start();
} else {
- replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+ replaceNode(feedInfo.jobSpec, failedNodeId, replacement);
}
}
@@ -195,7 +247,7 @@
List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
- Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
+ Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
OperatorDescriptorId opId = null;
for (Constraint constraint : userConstraints) {
LValueConstraintExpression lexpr = constraint.getLValue();
@@ -220,21 +272,23 @@
if (oldLocation.equals(failedNodeId)) {
locationConstraintsToReplace.add(constraint);
modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
- List<String> newLocs = newConstraints.get(opId);
+ Map<Integer, String> newLocs = newConstraints.get(opId);
if (newLocs == null) {
- newLocs = new ArrayList<String>();
+ newLocs = new HashMap<Integer, String>();
newConstraints.put(opId, newLocs);
}
- newLocs.add(replacementNode);
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, replacementNode);
} else {
if (modifiedOperators.contains(opId)) {
locationConstraintsToReplace.add(constraint);
- List<String> newLocs = newConstraints.get(opId);
+ Map<Integer, String> newLocs = newConstraints.get(opId);
if (newLocs == null) {
- newLocs = new ArrayList<String>();
+ newLocs = new HashMap<Integer, String>();
newConstraints.put(opId, newLocs);
}
- newLocs.add(oldLocation);
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, oldLocation);
} else {
List<Constraint> clist = candidateConstraints.get(opId);
if (clist == null) {
@@ -259,18 +313,23 @@
for (Constraint c : clist) {
if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
ConstraintExpression cexpr = c.getRValue();
+ int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
- newConstraints.get(mopId).add(oldLocation);
+ newConstraints.get(mopId).put(partition, oldLocation);
}
}
}
}
- for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
+ for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
OperatorDescriptorId nopId = entry.getKey();
- List<String> clist = entry.getValue();
+ Map<Integer, String> clist = entry.getValue();
IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, clist.toArray(new String[] {}));
+ String[] locations = new String[clist.size()];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = clist.get(i);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
}
}
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index a5f1813..805f8ea 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -78,9 +78,9 @@
function initTimeline(ingestLocations) {
document.write("<i>" + "Feed Ingestion" + " " + "<i>");
- document.write("<br />" + "Ingestion Locations: " + ingestLocations);
- document.write("<br />" + "Compute Locations: " + computeLocations);
- document.write("<br />" + "Storage Locations: " + storageLocations);
+ document.write("<br />" + "Ingestion Locations: " + ingestLocations.replace(",",", "));
+ document.write("<br />" + "Compute Locations: " + computeLocations.replace(",",", "));
+ document.write("<br />" + "Storage Locations: " + storageLocations.replace(",",", "));
document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
document.write("<br />" + "Status: " + state);
document.write("<br />");