1) minor fix in feed console 2) fixed bug incorrect evaluation of feed adapter output type
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index 3721147..71d57d7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -16,6 +16,8 @@
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,6 +30,11 @@
import org.json.JSONObject;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
public class FeedDataProviderServlet extends HttpServlet {
@@ -41,16 +48,22 @@
String dataverseName = request.getParameter("dataverse");
String report = getFeedReport(feedName, datasetName, dataverseName);
- System.out.println(" RECEIVED REPORT " + report);
+ System.out.println(" REPORT " + report);
long timestamp = System.currentTimeMillis();
-
- JSONObject obj = new JSONObject();
- JSONArray array = new JSONArray();
- try {
- obj.put("time", timestamp);
- obj.put("value", report);
- } catch (JSONException jsoe) {
- throw new IOException(jsoe);
+ JSONObject obj = null;
+ if (report != null) {
+ JSONArray array = new JSONArray();
+ try {
+ obj = new JSONObject();
+ obj.put("type", "report");
+ obj.put("time", timestamp);
+ obj.put("value", report);
+ } catch (JSONException jsoe) {
+ throw new IOException(jsoe);
+ }
+ } else {
+ obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
+ // do null check
}
PrintWriter out = response.getWriter();
@@ -68,4 +81,42 @@
}
return report;
}
+
+ private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
+ JSONObject obj = null;
+ try {
+ MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+ List<FeedActivity> feedActivities = MetadataManager.INSTANCE
+ .getActiveFeeds(ctx, dataverseName, datasetName);
+ boolean active = false;
+ FeedActivity activity = null;
+ for (FeedActivity feedActivity : feedActivities) {
+ active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
+ FeedActivityType.FEED_BEGIN));
+ if (active) {
+ activity = feedActivity;
+ break;
+ }
+ }
+ if (active) {
+ Map<String, String> activityDetails = activity.getFeedActivityDetails();
+ String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+ String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+ obj = new JSONObject();
+ obj.put("type", "reload");
+ obj.put("ingestLocations", ingestLocations);
+ obj.put("computeLocations", computeLocations);
+ obj.put("storageLocations", storageLocations);
+ System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+ + computeLocations + " storage at " + storageLocations);
+ } else {
+ obj = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return obj;
+
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 6b79958..94eefb6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -73,7 +73,6 @@
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.FeedOperations;
-import edu.uci.ics.asterix.file.FeedUtil;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
@@ -98,6 +97,7 @@
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
deleted file mode 100644
index a09ca3c..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package edu.uci.ics.asterix.file;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class FeedUtil {
-
- private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
-
- public static boolean isFeedActive(FeedActivity feedActivity) {
- return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_FAILURE) || feedActivity
- .getActivityType().equals(FeedActivityType.FEED_END)));
- }
-
- public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
- FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
- JobSpecification altered = null;
- altered = new JobSpecification();
- Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-
- // copy operators
- Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
- IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof FeedIntakeOperatorDescriptor) {
- FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
- FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
- orig.getAdapterFactory(), (ARecordType) orig.getAtype(), orig.getRecordDescriptor(),
- orig.getFeedPolicy());
- oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
- } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, FeedRuntimeType.STORAGE);
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
- } else {
- FeedRuntimeType runtimeType = null;
- if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
- IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
- .getRuntimeFactories()[0];
- if (runtimeFactory instanceof AssignRuntimeFactory) {
- runtimeType = FeedRuntimeType.COMPUTE;
- } else if (runtimeFactory instanceof StreamProjectRuntimeFactory) {
- runtimeType = FeedRuntimeType.COMMIT;
- }
- }
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, runtimeType);
-
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
- }
- }
-
- // copy connectors
- Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
- for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
- IConnectorDescriptor connDesc = entry.getValue();
- ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
- connectorMapping.put(entry.getKey(), newConnId);
- }
-
- // make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
- .getConnectorOperatorMap().entrySet()) {
- IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
- Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
- Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
-
- IOperatorDescriptor leftOpDesc = altered.getOperatorMap().get(
- oldNewOID.get(leftOp.getLeft().getOperatorId()));
- IOperatorDescriptor rightOpDesc = altered.getOperatorMap().get(
- oldNewOID.get(rightOp.getLeft().getOperatorId()));
-
- altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
- }
-
- // prepare for setting partition constraints
- Map<OperatorDescriptorId, List<String>> operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
- Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
-
- for (Constraint constraint : spec.getUserConstraints()) {
- LValueConstraintExpression lexpr = constraint.getLValue();
- ConstraintExpression cexpr = constraint.getRValue();
- OperatorDescriptorId opId;
- switch (lexpr.getTag()) {
- case PARTITION_COUNT:
- opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- if (operatorCounts.get(opId) == null) {
- operatorCounts.put(opId, 1);
- } else {
- operatorCounts.put(opId, operatorCounts.get(opId) + 1);
- }
- break;
- case PARTITION_LOCATION:
- opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
- List<String> locations = operatorLocations.get(opDesc.getOperatorId());
- if (locations == null) {
- locations = new ArrayList<String>();
- operatorLocations.put(opDesc.getOperatorId(), locations);
- }
- String location = (String) ((ConstantExpression) cexpr).getValue();
- locations.add(location);
- break;
- }
- }
-
- // set absolute location constraints
- for (Entry<OperatorDescriptorId, List<String>> entry : operatorLocations.entrySet()) {
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
- PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc,
- entry.getValue().toArray(new String[] {}));
- }
-
- // set count constraints
- for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
- if (!operatorLocations.keySet().contains(entry.getKey())) {
- PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
- }
- }
-
- // useConnectorSchedulingPolicy
- altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-
- // connectorAssignmentPolicy
- altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
-
- // roots
- for (OperatorDescriptorId root : spec.getRoots()) {
- altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("New Job Spec:" + altered);
- }
-
- return altered;
-
- }
-
-}
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index aa25bd8..365f2f5 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -36,6 +36,13 @@
function onFeedReportReceived(data) {
+ var type = data["type"];
+ if (type == ("reload")) {
+ ingestLocations = data["ingestLocations"];
+ computeLocations = data["computeLocations"];
+ storageLocations = data["storageLocations"];
+ document.location.reload(true);
+ } else {
var report = data["value"];
var tputArray = report.split("|");
var covered = 0;
@@ -49,6 +56,7 @@
ingestionTimeSeries[j].append(data["time"], 0);
}
ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ }
}
function myYRangeFunction(range) {
@@ -60,7 +68,7 @@
function initTimeline(ingestLocations) {
document.write("<i>" + "Feed Ingestion" + "<i>");
- document.write("<br />" + "Ingestion Locations: " + ingestLocations);
+ document.write("<br />" + "Ingestion Locations: " + computeLocations);
document.write("<br />" + "Compute Locations: " + computeLocations);
document.write("<br />" + "Storage Locations: " + storageLocations);
document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
@@ -69,7 +77,7 @@
document.write("<br />");
for( var i = 0; i < numIngestionNodes; i++){
- graphNames[i] = ingestionNodes[i];
+ graphNames[i] = "Partition " + i;
}
if(numIngestionNodes > 1){