checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
new file mode 100644
index 0000000..091cf4b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+
+public class ClusterEventHandler implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(ClusterEventHandler.class.getName());
+
+ private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox;
+
+ public ClusterEventHandler(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
+ this.inbox = inbox;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Set<IClusterManagementWork> workSet = inbox.take();
+ int nodesToAdd = 0;
+ Set<String> nodesToRemove = new HashSet<String>();
+ Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
+ Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+ for (IClusterManagementWork w : workSet) {
+ switch (w.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
+ nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+ }
+ nodeAdditionRequests.add(w);
+ break;
+ case REMOVE_NODE:
+ nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+ nodeRemovalRequests.add(w);
+ break;
+ }
+ }
+
+ Set<Node> addedNodes = new HashSet<Node>();
+ for (int i = 0; i < nodesToAdd; i++) {
+ Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+ if (node != null) {
+ try {
+ ClusterManager.INSTANCE.addNode(node);
+ addedNodes.add(node);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Added NC at:" + node.getId());
+ }
+ } catch (AsterixException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC at:" + node.getId());
+ }
+ e.printStackTrace();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC: no more available nodes");
+ }
+ }
+ }
+
+ for (IClusterManagementWork w : nodeAdditionRequests) {
+ w.getSourceSubscriber().notifyRequestCompletion(
+ new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
+ }
+
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("interruped" + e.getMessage());
+ }
+ throw new IllegalStateException(e);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unexpected exception in handling cluster event" + e.getMessage());
+ }
+ }
+
+ }
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 941733e..4f355f8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -14,33 +14,32 @@
*/
package edu.uci.ics.asterix.hyracks.bootstrap;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
-import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
public class ClusterLifecycleListener implements IClusterLifecycleListener {
+ private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+
public static ClusterLifecycleListener INSTANCE = new ClusterLifecycleListener();
private ClusterLifecycleListener() {
+ Thread t = new Thread(eventHandler);
+ t.start();
}
- private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+ private static final LinkedBlockingQueue<Set<IClusterManagementWork>> outbox = new LinkedBlockingQueue<Set<IClusterManagementWork>>();
+ private static ClusterEventHandler eventHandler = new ClusterEventHandler(outbox);
@Override
public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
@@ -64,52 +63,13 @@
work.addAll(sub.notifyNodeFailure(deadNodeIds));
}
- int nodesToAdd = 0;
- Set<String> nodesToRemove = new HashSet<String>();
- Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
- Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
- for (IClusterManagementWork w : work) {
- switch (w.getClusterManagementWorkType()) {
- case ADD_NODE:
- if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
- nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
- }
- nodeAdditionRequests.add(w);
- break;
- case REMOVE_NODE:
- nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
- nodeRemovalRequests.add(w);
- break;
+ try {
+ outbox.put(work);
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted :" + e.getMessage());
}
}
- Set<Node> addedNodes = new HashSet<Node>();
- for (int i = 0; i < nodesToAdd; i++) {
- Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
- if (node != null) {
- try {
- ClusterManager.INSTANCE.addNode(node);
- addedNodes.add(node);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Added NC at:" + node.getId());
- }
- } catch (AsterixException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to add NC at:" + node.getId());
- }
- e.printStackTrace();
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to add NC: no more available nodes");
- }
- }
- }
-
- for (IClusterManagementWork w : nodeAdditionRequests) {
- w.getSourceSubscriber().notifyRequestCompletion(
- new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
- }
-
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
index bde860f..6948dbc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -23,13 +23,16 @@
protected final IClusterEventsSubscriber subscriber;
+ protected final int workId;
+
@Override
public int getWorkId() {
- return WorkIdGenerator.getNextWorkId();
+ return workId;
}
public AbstractClusterManagementWork(IClusterEventsSubscriber subscriber) {
this.subscriber = subscriber;
+ this.workId = WorkIdGenerator.getNextWorkId();
}
private static class WorkIdGenerator {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
index e720c9f..5970978 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -13,10 +13,12 @@
this.status = status;
}
+ @Override
public IClusterManagementWork getWork() {
return work;
}
+ @Override
public Status getStatus() {
return status;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
index ae3ce7e..1ac9e34 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
@@ -1,11 +1,22 @@
package edu.uci.ics.asterix.metadata.cluster;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
public interface IClusterManagementWorkResponse {
public enum Status {
SUCCESS,
FAILURE
}
-
-
+
+ /**
+ * @return
+ */
+ public IClusterManagementWork getWork();
+
+ /**
+ * @return
+ */
+ public Status getStatus();
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 8c03198..9544abd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -65,6 +65,7 @@
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -443,10 +444,10 @@
}
Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
-
+
switch (adapterFactory.getAdapterType()) {
case TYPED:
- adapterOutputType = null;
+ adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
break;
case GENERIC:
String outputTypeName = datasetDetails.getProperties().get("output-type-name");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
index 3a981b2..2613aad 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
@@ -186,7 +186,7 @@
try {
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
-
+ feedInfo.jobInfo = info;
Map<String, String> feedActivityDetails = new HashMap<String, String>();
StringBuilder ingestLocs = new StringBuilder();
for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
@@ -279,6 +279,7 @@
public JobSpecification jobSpec;
public List<String> ingestLocations = new ArrayList<String>();
public List<String> computeLocations = new ArrayList<String>();
+ public JobInfo jobInfo;
public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
this.feedId = feedId;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
index bd154e6..257c8fd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
@@ -24,27 +24,40 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -61,10 +74,13 @@
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
+
public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
private LinkedBlockingQueue<Message> jobEventInbox;
private LinkedBlockingQueue<FeedFailureReport> failureEventInbox;
+ private Map<Integer, FeedFailureReport> feedsWaitingForResponse = new HashMap<Integer, FeedFailureReport>();
private FeedLifecycleListener() {
jobEventInbox = new LinkedBlockingQueue<Message>();
@@ -216,7 +232,7 @@
try {
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
-
+ feedInfo.jobInfo = info;
Map<String, String> feedActivityDetails = new HashMap<String, String>();
StringBuilder ingestLocs = new StringBuilder();
for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
@@ -309,6 +325,7 @@
public JobSpecification jobSpec;
public List<String> ingestLocations = new ArrayList<String>();
public List<String> computeLocations = new ArrayList<String>();
+ public JobInfo jobInfo;
public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
this.feedId = feedId;
@@ -341,6 +358,7 @@
}
}
}
+
return handleFailure(failureReport);
}
@@ -374,6 +392,7 @@
AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
work.add(addNodesWork);
+ feedsWaitingForResponse.put(addNodesWork.getWorkId(), failureReport);
return work;
}
@@ -402,7 +421,133 @@
@Override
public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- // TODO Auto-generated method stub
+ IClusterManagementWork submittedWork = response.getWork();
+ switch (submittedWork.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+ switch (resp.getStatus()) {
+ case FAILURE:
+ break;
+ case SUCCESS:
+ AddNodeWork work = (AddNodeWork) submittedWork;
+ FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
+ Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
+ for (FeedInfo feedInfo : affectedFeeds) {
+ try {
+ recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Recovered feed:" + feedInfo);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to recover feed:" + feedInfo);
+ }
+ }
+ }
+ break;
+ }
+ resp.getNodesAdded();
+ break;
+ case REMOVE_NODE:
+ break;
+ }
+ }
+
+ private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
+ throws Exception {
+ for (FeedFailure feedFailure : feedFailures) {
+ switch (feedFailure.failureType) {
+ case INGESTION_NODE:
+ alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
+ break;
+ }
+ }
+ JobSpecification spec = feedInfo.jobSpec;
+ AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+ }
+
+ private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
+ Random r = new Random();
+ Object[] rnodes = resp.getNodesAdded().toArray();
+ Node replacementNode = (Node) rnodes[r.nextInt(rnodes.length)];
+ String replacementNodeId = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName() + "_"
+ + replacementNode.getId();
+ Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
+ Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
+ List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+ List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+ List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+ Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
+ OperatorDescriptorId opId = null;
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (modifiedOperators.contains(opId)) {
+ countConstraintsToReplace.add(constraint);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ if (oldLocation.equals(failedNodeId)) {
+ locationConstraintsToReplace.add(constraint);
+ modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+ List<String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new ArrayList<String>();
+ newConstraints.put(opId, newLocs);
+ }
+ newLocs.add(replacementNodeId);
+ } else {
+ if (modifiedOperators.contains(opId)) {
+ locationConstraintsToReplace.add(constraint);
+ List<String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new ArrayList<String>();
+ newConstraints.put(opId, newLocs);
+ }
+ newLocs.add(oldLocation);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ }
+ break;
+ }
+ }
+
+ feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+ feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+ for (OperatorDescriptorId mopId : modifiedOperators) {
+ List<Constraint> clist = candidateConstraints.get(mopId);
+ if (clist != null && !clist.isEmpty()) {
+ feedInfo.jobSpec.getUserConstraints().removeAll(clist);
+ }
+ }
+
+ for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
+ OperatorDescriptorId nopId = entry.getKey();
+ List<String> clist = entry.getValue();
+ IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
+ clist.toArray(new String[] {}));
+ }
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
new file mode 100644
index 0000000..a871aba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+
+public interface ITypedAdapterFactory extends IAdapterFactory {
+
+ public ARecordType getAdapterOutputType();
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index fe4212d..4742b16 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -38,25 +38,11 @@
private static final long serialVersionUID = 1L;
private Map<String, Object> configuration;
- public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx)
- throws AsterixException {
+ public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
+ IHyracksTaskContext ctx) throws AsterixException {
super(configuration, ctx);
this.configuration = configuration;
-
- String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
- "followers_count" };
-
- IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
- BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
- ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
-
- String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
- "message-text" };
-
- AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
- IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
- BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
- adapterOutputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+ this.adapterOutputType = outputType;
}
@Override
@@ -64,7 +50,6 @@
return new SyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
}
-
private static class SyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedClient.class.getName());
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 48b1d20..3d05870 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -14,11 +14,28 @@
*/
package edu.uci.ics.asterix.tools.external.data;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -28,7 +45,7 @@
* on the local file system or on HDFS. The feed ends when the content of the
* source file has been ingested.
*/
-public class SyntheticTwitterFeedAdapterFactory implements IAdapterFactory {
+public class SyntheticTwitterFeedAdapterFactory implements ITypedAdapterFactory {
/**
*
@@ -37,6 +54,10 @@
private Map<String, Object> configuration;
+ private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+
+ private static final ARecordType outputType = initOutputType();
+
@Override
public String getName() {
return "synthetic_twitter_feed";
@@ -59,12 +80,61 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(1);
+ String dvds = (String) configuration.get(KEY_DATAVERSE_DATASET);
+ String[] components = dvds.split(":");
+ String dataverse = components[0];
+ String dataset = components[1];
+ MetadataTransactionContext ctx = null;
+ NodeGroup ng = null;
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
+ String nodegroupName = ((FeedDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
+ ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ throw e;
+ }
+ List<String> storageNodes = ng.getNodeNames();
+ Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+ nodes.removeAll(storageNodes);
+ Random r = new Random();
+ String ingestionLocation = nodes.toArray(new String[] {})[r.nextInt(nodes.size())];
+ return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
- return new SyntheticTwitterFeedAdapter(configuration, ctx);
+ return new SyntheticTwitterFeedAdapter(configuration, outputType, ctx);
}
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+ private static ARecordType initOutputType() {
+ ARecordType outputType = null;
+ try {
+ String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
+ "followers_count" };
+
+ IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
+ BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
+ ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
+
+ String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
+ "message-text" };
+
+ AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+ IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+ BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
+ outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+
+ } catch (AsterixException e) {
+ throw new IllegalStateException("Unable to initialize output type");
+ }
+ return outputType;
+ }
}
\ No newline at end of file