1) minor fixes to feed management console 2) added provision for specifying (min,max) tps to the twitter firehose adaptor
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index a032db4..a89b6d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -63,7 +63,6 @@
}
} else {
obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
- // do null check
}
PrintWriter out = response.getWriter();
@@ -75,7 +74,7 @@
LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
String report = null;
try {
- report = queue.poll(30, TimeUnit.SECONDS);
+ report = queue.poll(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
@@ -83,35 +82,33 @@
}
private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
- JSONObject obj = null;
+ JSONObject obj = new JSONObject();
try {
MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
List<FeedActivity> feedActivities = MetadataManager.INSTANCE
.getActiveFeeds(ctx, dataverseName, datasetName);
- boolean active = false;
- FeedActivity activity = null;
- for (FeedActivity feedActivity : feedActivities) {
- active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
- FeedActivityType.FEED_BEGIN));
- if (active) {
- activity = feedActivity;
+ FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
+ FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId, null);
+ switch(activity.getActivityType()){
+ case FEED_BEGIN:
+ Map<String, String> activityDetails = activity.getFeedActivityDetails();
+ String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+ String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+ obj.put("status", "active");
+ obj.put("type", "reload");
+ obj.put("ingestLocations", ingestLocations);
+ obj.put("computeLocations", computeLocations);
+ obj.put("storageLocations", storageLocations);
+ System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+ + computeLocations + " storage at " + storageLocations);
break;
- }
- }
- if (active) {
- Map<String, String> activityDetails = activity.getFeedActivityDetails();
- String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
- String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
- String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
- obj = new JSONObject();
- obj.put("type", "reload");
- obj.put("ingestLocations", ingestLocations);
- obj.put("computeLocations", computeLocations);
- obj.put("storageLocations", storageLocations);
- System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
- + computeLocations + " storage at " + storageLocations);
- } else {
- obj = null;
+ case FEED_FAILURE:
+ obj.put("status", "failed");
+ break;
+ case FEED_END:
+ obj.put("status", "ended");
+ break;
}
} catch (Exception e) {
// ignore
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 97a8576..a5f1813 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,7 +5,7 @@
<script type="text/javascript">
$(document).ready(function() {
var feedSeries = new TimeSeries();
-
+ var state = "ACTIVE";
var dataverse = "%s";
var dataset = "%s";
var feed = "%s";
@@ -24,7 +24,7 @@
var graphNames = new Array();
$.ajaxSetup({ cache: false });
- setInterval(fetchFeedReport, 500);
+ setInterval(fetchFeedReport, 5000);
function fetchFeedReport() {
$.ajax({
url: '/feed/data?dataverse=' + dataverse + '&dataset=' + dataset + '&feed=' + feed,
@@ -36,26 +36,36 @@
function onFeedReportReceived(data) {
- var type = data["type"];
- if (type == ("reload")) {
- ingestLocations = data["ingestLocations"];
- computeLocations = data["computeLocations"];
- storageLocations = data["storageLocations"];
+ var status = data["status"];
+ if(status == ("ended")){
+ ingestLocations = " ";
+ computeLocations = " ";
+ storageLocations = " ";
+ ingestionPolicy = " ";
+ state = "INACTIVE";
document.location.reload(true);
} else {
- var report = data["value"];
- var tputArray = report.split("|");
- var covered = 0;
- var totalTput = 0;
- for( var i = 0; i < tputArray.length-1; i ++){
- ingestionTimeSeries[i].append(data["time"], tputArray[i]);
- covered++;
- totalTput += parseInt(tputArray[i]);
- }
- for( var j = covered; j < numIngestionNodes; j++){
- ingestionTimeSeries[j].append(data["time"], 0);
- }
- ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ var type = data["type"];
+ if (type == ("reload")) {
+ ingestLocations = data["ingestLocations"];
+ computeLocations = data["computeLocations"];
+ storageLocations = data["storageLocations"];
+ document.location.reload(true);
+ } else {
+ var report = data["value"];
+ var tputArray = report.split("|");
+ var covered = 0;
+ var totalTput = 0;
+ for( var i = 0; i < tputArray.length-1; i ++){
+ ingestionTimeSeries[i].append(data["time"], tputArray[i]);
+ covered++;
+ totalTput += parseInt(tputArray[i]);
+ }
+ for( var j = covered; j < numIngestionNodes; j++){
+ ingestionTimeSeries[j].append(data["time"], 0);
+ }
+ ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ }
}
}
@@ -72,7 +82,7 @@
document.write("<br />" + "Compute Locations: " + computeLocations);
document.write("<br />" + "Storage Locations: " + storageLocations);
document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
- document.write("<br />" + "Active since" + activeSince);
+ document.write("<br />" + "Status: " + state);
document.write("<br />");
document.write("<br />");
@@ -103,7 +113,7 @@
}
function drawChart(chartName, ingestionTimeSeries) {
- var ingestionChart = new SmoothieChart({ minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
+ var ingestionChart = new SmoothieChart({ timestampFormatter:SmoothieChart.timeFormatter, interpolation:'linear', minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
ingestionChart.addTimeSeries(ingestionTimeSeries, seriesOptions);
ingestionChart.streamTo(document.getElementById(chartName, 500));
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index c06f884..a6c7c18 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -17,30 +17,70 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+/**
+ * A wrapper around the standard frame writer provided to an operator node pushable.
+ * The wrapper monitors the flow of data from this operator to a downstream operator
+ * over a connector. It collects statistics if required by the feed ingestion policy
+ * and reports them to the Super Feed Manager chosen for the feed. In addition any
+ * congestion experienced by the operator is also reported.
+ */
public class FeedFrameWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(FeedFrameWriter.class.getName());
+ /** The threshold for the time required in pushing a frame to the network. **/
+ public static final long FLUSH_THRESHOLD_TIME = 5000; // 3 seconds
+
+ /** Actual frame writer provided to an operator. **/
private IFrameWriter writer;
+ /** The node pushable associated with the operator **/
private IOperatorNodePushable nodePushable;
- private final boolean collectStatistics;
+ /** set to true if health need to be monitored **/
+ private final boolean reportHealth;
+ /** A buffer for keeping frames that are waiting to be processed **/
private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+ /**
+ * Mode associated with the frame writer
+ * Possible values: FORWARD, STORE
+ *
+ * @see Mode
+ */
private Mode mode;
- public static final long FLUSH_THRESHOLD_TIME = 5000;
+ /**
+ * Detects if the operator is unable to push a frame downstream
+ * within a threshold period of time. In addition, measure the
+ * throughput as observed on the output channel of the associated operator.
+ */
+ private HealthMonitor healthMonitor;
- private FramePushWait framePushWait;
-
+ /**
+ * Manager scheduling of tasks
+ */
private Timer timer;
+ /**
+ * Provides access to the tuples in a frame. Used in collecting statistics.
+ */
private FrameTupleAccessor fta;
public enum Mode {
+ /**
+ * **
+ * Normal mode of operation for an operator when
+ * frames are pushed to the downstream operator.
+ */
FORWARD,
+
+ /**
+ * Failure mode of operation for an operator when
+ * input frames are not pushed to the downstream operator but
+ * are buffered for future retrieval.
+ */
STORE
}
@@ -50,13 +90,21 @@
this.writer = writer;
this.mode = Mode.FORWARD;
this.nodePushable = nodePushable;
- this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
- if (collectStatistics) {
+ this.reportHealth = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
+ if (reportHealth) {
timer = new Timer();
- framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
- partition, FLUSH_THRESHOLD_TIME, timer);
-
- timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
+ healthMonitor = new HealthMonitor(FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType, partition,
+ FLUSH_THRESHOLD_TIME, timer);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Statistics collection enabled for the feed " + feedId + " " + feedRuntimeType + " ["
+ + partition + "]");
+ }
+ timer.scheduleAtFixedRate(healthMonitor, 0, FLUSH_THRESHOLD_TIME);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Statistics collection *not* enabled for the feed " + feedId + " " + feedRuntimeType + " ["
+ + partition + "]");
+ }
}
this.fta = fta;
}
@@ -75,33 +123,16 @@
this.mode = newMode;
}
- public List<ByteBuffer> getStoredFrames() {
- return frames;
- }
-
- public void clear() {
- frames.clear();
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- public void reset() {
- framePushWait.reset();
- }
-
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
switch (mode) {
case FORWARD:
try {
- if (collectStatistics) {
+ if (reportHealth) {
fta.reset(buffer);
- framePushWait.notifyStart();
+ healthMonitor.notifyStart();
writer.nextFrame(buffer);
- framePushWait.notifyFinish(fta.getTupleCount());
+ healthMonitor.notifyFinish(fta.getTupleCount());
} else {
writer.nextFrame(buffer);
}
@@ -126,15 +157,17 @@
storageBuffer.put(buffer);
frames.add(storageBuffer);
storageBuffer.flip();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stored frame for " + nodePushable);
+ }
break;
}
}
- private static class FramePushWait extends TimerTask {
+ private static class HealthMonitor extends TimerTask {
private long startTime = -1;
- private IOperatorNodePushable nodePushable;
- private State state;
+ private FramePushState state;
private long flushThresholdTime;
private static final String EOL = "\n";
private FeedConnectionId feedId;
@@ -146,11 +179,10 @@
private boolean collectThroughput;
private FeedMessageService mesgService;
- public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, FeedConnectionId feedId,
- String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period, Timer timer) {
- this.nodePushable = nodePushable;
+ public HealthMonitor(long flushThresholdTime, FeedConnectionId feedId, String nodeId,
+ FeedRuntimeType feedRuntimeType, int partition, long period, Timer timer) {
this.flushThresholdTime = flushThresholdTime;
- this.state = State.INTIALIZED;
+ this.state = FramePushState.INTIALIZED;
this.feedId = feedId;
this.nodeId = nodeId;
this.feedRuntimeType = feedRuntimeType;
@@ -161,57 +193,49 @@
public void notifyStart() {
startTime = System.currentTimeMillis();
- state = State.WAITING_FOR_FLUSH_COMPLETION;
-
+ state = FramePushState.WAITING_FOR_FLUSH_COMPLETION;
}
+ /**
+ * Reset method is invoked when a live instance of operator needs to take
+ * over from the zombie instance from the previously failed execution
+ */
public void reset() {
mesgService = null;
collectThroughput = true;
}
public void notifyFinish(int numTuples) {
- state = State.WAITNG_FOR_NEXT_FRAME;
+ state = FramePushState.WAITNG_FOR_NEXT_FRAME;
numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
}
@Override
public void run() {
- if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
+ if (state.equals(FramePushState.WAITING_FOR_FLUSH_COMPLETION)) {
long currentTime = System.currentTimeMillis();
if (currentTime - startTime > flushThresholdTime) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
}
- sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION);
+ sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION,
+ System.currentTimeMillis());
}
}
if (collectThroughput) {
- System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in " + period
- + " for partition " + partition);
int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
- sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
+ sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT, System.currentTimeMillis());
}
numTuplesInInterval.set(0);
}
- private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
- String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
- String operator = "" + feedRuntimeType;
- String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
- + value + "|" + nodeId + "|" + EOL;
+ private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType, long timestamp) {
if (mesgService == null) {
- while (mesgService == null) {
- mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
- if (mesgService == null) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
+ waitTillMessageServiceIsUp();
}
+ String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
+ String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + feedRuntimeType + "|" + partition
+ + "|" + value + "|" + nodeId + "|" + timestamp + "|" + EOL;
try {
mesgService.sendMessage(message);
} catch (IOException ioe) {
@@ -223,17 +247,43 @@
}
}
+ private void waitTillMessageServiceIsUp() {
+ while (mesgService == null) {
+ mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
+ if (mesgService == null) {
+ try {
+ /**
+ * wait for the message service to be available
+ */
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Encountered an interrupted exception " + " Exception " + e);
+ }
+ }
+ }
+ }
+ }
+
public void deactivate() {
+ this.cancel();
collectThroughput = false;
}
- public void activate() {
- collectThroughput = true;
- }
-
- private enum State {
+ private enum FramePushState {
+ /**
+ * Frame writer has been initialized
+ */
INTIALIZED,
+
+ /**
+ * Frame writer is waiting for a pending flush to finish.
+ */
WAITING_FOR_FLUSH_COMPLETION,
+
+ /**
+ * Frame writer is waiting to be given the next frame.
+ */
WAITNG_FOR_NEXT_FRAME
}
@@ -242,21 +292,19 @@
@Override
public void fail() throws HyracksDataException {
writer.fail();
- if (framePushWait != null && !framePushWait.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
- framePushWait.cancel();
- framePushWait.deactivate();
+ if (healthMonitor != null && !healthMonitor.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
+ healthMonitor.deactivate();
}
- framePushWait.reset();
+ healthMonitor.reset();
}
@Override
public void close() throws HyracksDataException {
- if (framePushWait != null) {
+ if (healthMonitor != null) {
+ healthMonitor.deactivate();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closing frame statistics collection activity" + framePushWait);
+ LOGGER.info("Closing frame statistics collection activity" + healthMonitor);
}
- framePushWait.deactivate();
- framePushWait.cancel();
}
writer.close();
}
@@ -274,4 +322,21 @@
return "MaterializingFrameWriter using " + writer;
}
+ public List<ByteBuffer> getStoredFrames() {
+ return frames;
+ }
+
+ public void clear() {
+ frames.clear();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ public void reset() {
+ healthMonitor.reset();
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index efa6cbb..5c8c422 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -31,17 +31,25 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
/**
- * Operator responsible for ingesting data from an external source. This
- * operator uses a (configurable) adapter associated with the feed dataset.
+ * FeedIntakeOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
*/
public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
+ /** The type associated with the ADM data output from the feed adaptor*/
private final IAType atype;
+
+ /** unique identifier for a feed instance.*/
private final FeedConnectionId feedId;
+
+ /** Map representation of policy parameters */
private final Map<String, String> feedPolicy;
+
+ /** The adaptor factory that is used to create an instance of the feed adaptor**/
private IAdapterFactory adapterFactory;
public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
@@ -73,6 +81,9 @@
}
}
} catch (Exception e) {
+ if(LOGGER.isLoggable(Level.SEVERE)){
+ LOGGER.severe("Initialization of the feed adaptor failed");
+ }
throw new HyracksDataException("initialization of adapter failed", e);
}
return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 5ed4ccc..58615fc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -22,7 +22,6 @@
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
index 2e31f6d..40b2f26 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -9,6 +9,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+/**
+ * Sends feed report messages on behalf of an operator instance
+ * to the SuperFeedMaanger associated with the feed.
+ */
public class FeedMessageService {
private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 78b03ee..cc597cc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -14,7 +13,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -22,15 +20,42 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
+/**
+ * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
+ * environment for an hyracks operator that is part of a feed ingestion pipeline.
+ * The MetaFeed operator provides an interface iden- tical to that offered by the
+ * underlying wrapped operator, hereafter referred to as the core operator.
+ * As seen by Hyracks, the altered pipeline is identical to the earlier version formed
+ * from core operators. The MetaFeed operator enhances each core operator by providing
+ * functionality for handling runtime exceptions, saving any state for future retrieval,
+ * and measuring/reporting of performance characteristics. We next describe how the added
+ * functionality contributes to providing fault- tolerance.
+ */
+
public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
+ /** The actual (Hyracks) operator that is wrapped around by the Metafeed Adaptor **/
private IOperatorDescriptor coreOperator;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents the flow of data
+ * from a feed to a dataset.
+ **/
private final FeedConnectionId feedConnectionId;
+
+ /**
+ * The policy associated with the feed instance.
+ */
private final FeedPolicy feedPolicy;
+
+ /**
+ * type for the feed runtime associated with the operator.
+ * Possible values: INGESTION, COMPUTE, STORAGE, COMMIT
+ */
private final FeedRuntimeType runtimeType;
public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
@@ -59,16 +84,44 @@
private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ /** Runtime node pushable corresponding to the core feed operator **/
private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
+
+ /**
+ * A policy enforcer that ensures dyanmic decisions for a feed are taken in accordance
+ * with the associated ingestion policy
+ **/
private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime captures the state of the operator while
+ * the feed is active.
+ */
private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents the flow of data
+ * from a feed to a dataset.
+ **/
private FeedConnectionId feedId;
+
+ /** Denotes the i'th operator instance in a setting where K operator instances are scheduled to run in parallel **/
private int partition;
+
+ /** A buffer that is used to hold the current frame that is being processed **/
private ByteBuffer currentBuffer;
+
+ /** Type associated with the core feed operator **/
private final FeedRuntimeType runtimeType;
+
+ /** True is the feed is recovering from a previous failed execution **/
private boolean resumeOldState;
- private ExecutorService feedExecService;
+
+ /** The Node Controller ID for the host NC **/
+
private String nodeId;
+
+ /** Allows to iterate over the tuples in a frame **/
private FrameTupleAccessor fta;
public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
@@ -93,21 +146,25 @@
feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+ LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
+ + runtimeType + " node.");
}
resumeOldState = false;
} else {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
+ LOGGER.warning("Retreived state from the zombie instance from previous execution for "
+ + runtimeType + " node.");
}
resumeOldState = true;
}
- feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
runtimeType, partition, fta);
coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
coreOperatorNodePushable.open();
} catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to initialize feed operator " + feedRuntime + " [" + partition + "]");
+ }
throw new HyracksDataException(e);
}
}
@@ -117,7 +174,8 @@
try {
if (resumeOldState) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Old state " + feedRuntime.getRuntimeState().getFrame());
+ LOGGER.warning("State from previous zombie instance "
+ + feedRuntime.getRuntimeState().getFrame());
}
coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
feedRuntime.setRuntimeState(null);
@@ -131,8 +189,18 @@
// log the tuple
FeedRuntimeState runtimeState = new FeedRuntimeState(buffer, writer, e);
feedRuntime.setRuntimeState(runtimeState);
+ String message = e.getMessage();
+ String tIndexString = message.substring(message.lastIndexOf(':'));
+ int tupleIndex = 0;
+ if (tIndexString != null) {
+ tupleIndex = Integer.parseInt(tIndexString);
+ }
+ fta.reset(buffer);
+ int endOffset = fta.getTupleEndOffset(tupleIndex);
+ buffer.flip();
+ buffer.position(endOffset + 1);
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Harmful exception (parked data) " + e);
+ LOGGER.warning("Harmful exception (parked data) tupleIndex " + tupleIndex + e);
}
} else {
// ignore the frame (exception is expected)
@@ -140,8 +208,10 @@
LOGGER.warning("Ignoring exception " + e);
}
}
-
} else {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Feed policy does not require feed to survive soft failure");
+ }
throw e;
}
}
@@ -156,7 +226,7 @@
return false;
} else {
if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.warning("Received duplicate key exception!");
+ LOGGER.severe("Received duplicate key exception!");
}
return true;
}
@@ -199,12 +269,4 @@
return coreOperator;
}
- public FeedConnectionId getFeedConnectionId() {
- return feedConnectionId;
- }
-
- public FeedPolicy getFeedPolicy() {
- return feedPolicy;
- }
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
index d0a83d8..fbff931 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
@@ -13,10 +13,22 @@
private int partition = -1;
private FeedRuntimeType runtimeType;
private long value = -1;
- private final String[] representation;
+ private String[] representation;
- public FeedReport(String rep) {
- representation = rep.split("\\|");
+ public FeedReport() {
+ }
+
+ public FeedReport(String message) {
+ representation = message.split("\\|");
+ }
+
+ public void reset(String message) {
+ representation = message.split("\\|");
+ reportType = null;
+ feedId = null;
+ runtimeType = null;
+ partition = -1;
+ value = -1;
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 18177c2..a45456b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -33,24 +33,51 @@
import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+/**
+ * The feed operators running in an NC report their health (statistics) to the local Feed Manager.
+ * A feed thus has a Feed Manager per NC. From amongst the Feed Maanger, a SuperFeedManager is chosen (randomly)
+ * The SuperFeedManager collects reports from the FeedMaangers and has the global cluster view in terms of
+ * how different feed operators running in a distributed fashion are performing.
+ */
public class SuperFeedManager {
private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
+
+ /**
+ * IP Address or DNS name of the host where Super Feed Manager is running.
+ */
private String host;
private AtomicInteger availablePort; // starting value is fixed
+ /**
+ * The port at which the SuperFeedManager listens for connections by other Feed Managers.
+ */
private final int feedReportPort; // fixed
+ /**
+ * The port at which the SuperFeedManager listens for connections by clients that wish
+ * to subscribe to the feed health reports.E.g. feed management console.
+ */
private final int feedReportSubscribePort; // fixed
+ /**
+ * The Id of Node Controller
+ */
private final String nodeId;
+ /**
+ * A unique identifier for the feed instance. A feed instance represents the flow of data
+ * from a feed to a dataset.
+ **/
private final FeedConnectionId feedConnectionId;
+ /**
+ * Set to true of the Super Feed Manager is local to the NC.
+ **/
private boolean isLocal = false;
- private SuperFeedReportService sfmService;
+ private FeedReportDestinationSocketProvider sfmService;
private SuperFeedReportSubscriptionService subscriptionService;
@@ -102,7 +129,8 @@
public void start() throws IOException {
if (sfmService == null) {
ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedConnectionId);
- sfmService = new SuperFeedReportService(feedReportPort, feedReportInbox, feedConnectionId, availablePort);
+ sfmService = new FeedReportDestinationSocketProvider(feedReportPort, feedReportInbox, feedConnectionId,
+ availablePort);
executorService.execute(sfmService);
subscriptionService = new SuperFeedReportSubscriptionService(feedConnectionId, feedReportSubscribePort,
sfmService.getMesgAnalyzer(), availablePort);
@@ -144,10 +172,10 @@
private AtomicInteger subscriptionPort;
private boolean active = true;
private String EOM = "\n";
- private final SFMessageAnalyzer reportProvider;
+ private final FeedReportProvider reportProvider;
private final List<FeedDataProviderService> dataProviders = new ArrayList<FeedDataProviderService>();
- public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, SFMessageAnalyzer reportProvider,
+ public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, FeedReportProvider reportProvider,
AtomicInteger nextPort) throws IOException {
this.feedId = feedId;
serverFeedSubscribe = FeedManager.INSTANCE.getFeedRuntimeManager(feedId).createServerSocket(port);
@@ -238,36 +266,35 @@
}
- private static class SuperFeedReportService implements Runnable {
+ private static class FeedReportDestinationSocketProvider implements Runnable {
+
+ private static final String EOM = "\n";
private AtomicInteger nextPort;
- private final ServerSocket serverFeedReport;
-
- private String EOM = "\n";
-
+ private final ServerSocket feedReportSocket;
private final LinkedBlockingQueue<String> inbox;
private final List<MessageListener> messageListeners;
- private final SFMessageAnalyzer mesgAnalyzer;
+ private final FeedReportProvider mesgAnalyzer;
private final FeedConnectionId feedId;
private boolean process = true;
- public SuperFeedReportService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId,
- AtomicInteger availablePort) throws IOException {
+ public FeedReportDestinationSocketProvider(int port, LinkedBlockingQueue<String> inbox,
+ FeedConnectionId feedId, AtomicInteger availablePort) throws IOException {
FeedRuntimeManager runtimeManager = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
- serverFeedReport = runtimeManager.createServerSocket(port);
- nextPort = availablePort;
+ this.feedReportSocket = runtimeManager.createServerSocket(port);
+ this.nextPort = availablePort;
this.inbox = inbox;
this.feedId = feedId;
this.messageListeners = new ArrayList<MessageListener>();
- mesgAnalyzer = new SFMessageAnalyzer(inbox, feedId);
+ this.mesgAnalyzer = new FeedReportProvider(inbox, feedId);
FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgAnalyzer);
}
public void stop() {
process = false;
- if (serverFeedReport != null) {
+ if (feedReportSocket != null) {
try {
- serverFeedReport.close();
+ feedReportSocket.close();
process = false;
} catch (IOException e) {
e.printStackTrace();
@@ -284,12 +311,18 @@
Socket client = null;
while (process) {
try {
- client = serverFeedReport.accept();
- OutputStream os = client.getOutputStream();
+ client = feedReportSocket.accept();
int port = nextPort.incrementAndGet();
+ /**
+ * MessageListener provides the functionality of listening at a port for messages
+ * and delivering each received message to an input queue (inbox).
+ */
MessageListener listener = new MessageListener(port, inbox);
listener.start();
- messageListeners.add(listener);
+ synchronized (messageListeners) {
+ messageListeners.add(listener);
+ }
+ OutputStream os = client.getOutputStream();
os.write((port + EOM).getBytes());
os.flush();
} catch (IOException e) {
@@ -308,13 +341,18 @@
}
}
- public SFMessageAnalyzer getMesgAnalyzer() {
+ public FeedReportProvider getMesgAnalyzer() {
return mesgAnalyzer;
}
}
- private static class SFMessageAnalyzer implements Runnable {
+ /**
+ * The report messages sent by the feed operators are sent to the FeedReportProvider.
+ * FeedReportMessageAnalyzer is responsible for distributing the messages to the subscribers.
+ * The Feed Management Console is an example of a subscriber.
+ */
+ private static class FeedReportProvider implements Runnable {
private final LinkedBlockingQueue<String> inbox;
private final FeedConnectionId feedId;
@@ -322,13 +360,12 @@
private final List<LinkedBlockingQueue<String>> subscriptionQueues;
private final Map<String, String> ingestionThroughputs;
- public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+ public FeedReportProvider(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
throws UnknownHostException, IOException {
this.inbox = inbox;
this.feedId = feedId;
this.subscriptionQueues = new ArrayList<LinkedBlockingQueue<String>>();
this.ingestionThroughputs = new HashMap<String, String>();
- String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
}
public void stop() {
@@ -345,170 +382,62 @@
public void run() {
StringBuilder finalMessage = new StringBuilder();
+ FeedReport report = new FeedReport();
while (process) {
try {
String message = inbox.take();
- FeedReport report = new FeedReport(message);
+ report.reset(message);
FeedReportMessageType mesgType = report.getReportType();
switch (mesgType) {
case THROUGHPUT:
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.warning("SuperFeedManager received message " + message);
+ LOGGER.warning("Feed Health Report " + message);
}
- for (LinkedBlockingQueue<String> q : subscriptionQueues) {
- String[] comp = message.split("\\|");
- String parition = comp[3];
- String tput = comp[4];
- String location = comp[5];
- ingestionThroughputs.put(parition, tput);
- // throughput in partition order
+ String[] msgComponents = message.split("\\|");
+ String partition = msgComponents[3];
+ String tput = msgComponents[4];
+ String timestamp = msgComponents[6];
+
+ boolean dispatchReport = true;
+ if (ingestionThroughputs.get(partition) == null) {
+ ingestionThroughputs.put(partition, tput);
+ dispatchReport = false;
+ } else {
for (int i = 0; i < ingestionThroughputs.size(); i++) {
String tp = ingestionThroughputs.get(i + "");
- finalMessage.append(tp + "|");
+ if (tp != null) {
+ ingestionThroughputs.put(i + "", null);
+ finalMessage.append(tp + "|");
+ } else {
+ dispatchReport = false;
+ break;
+ }
}
- q.add(finalMessage.toString());
- finalMessage.delete(0, finalMessage.length());
+ ingestionThroughputs.put(partition, tput);
}
+
+ if (dispatchReport) {
+ String dispatchedReport = finalMessage.toString();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Dispatched report " + dispatchedReport);
+ }
+ for (LinkedBlockingQueue<String> q : subscriptionQueues) {
+ q.add(dispatchedReport);
+ }
+ }
+ finalMessage.delete(0, finalMessage.length());
break;
case CONGESTION:
// congestionInbox.add(report);
break;
}
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(message);
- }
} catch (InterruptedException e) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Unable to process messages " + e.getMessage());
+ LOGGER.info("Unable to process messages " + e.getMessage() + " for feed " + feedId);
}
}
}
}
- private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
-
- private String ccHost;
- private CongestionAnalyzer congestionAnalyzer;
- private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
- private ExecutorService executorService;
- private LinkedBlockingQueue<String> mesgInbox = new LinkedBlockingQueue<String>();
-
- public SuperFeedManagerMessageAnalzer(FeedConnectionId feedId) {
- ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
- congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
- executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
- executorService.execute(congestionAnalyzer);
- }
-
- public void receiveMessage(String message) {
- Socket socket = null;
- OutputStream os = null;
- try {
- FeedReport report = new FeedReport(message);
- FeedReportMessageType mesgType = report.getReportType();
- switch (mesgType) {
- case THROUGHPUT:
- //send message to FeedHealthDataReceiver at CC (2999)
- socket = new Socket(ccHost, 2999);
- os = socket.getOutputStream();
- os.write(message.getBytes());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.warning("SuperFeedManager received message " + message);
- }
- break;
- case CONGESTION:
- congestionInbox.add(report);
- break;
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(message);
- }
- if (os != null) {
- os.close();
- }
- if (socket != null) {
- socket.close();
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("unable to send message to FeedHealthDataReceiver");
- }
- }
- }
-
- @Override
- public LinkedBlockingQueue<String> getMessageQueue() {
- return mesgInbox;
- }
-
- }
-
- private static class CongestionAnalyzer implements Runnable {
-
- private final LinkedBlockingQueue<FeedReport> inbox;
- private FeedReport lastMaxReport;
- private int congestionCount;
- private long lastReportedCongestion = 0;
- private long closeEnoughTimeBound = FeedFrameWriter.FLUSH_THRESHOLD_TIME * 2;
-
- public CongestionAnalyzer(LinkedBlockingQueue<FeedReport> inbox) {
- this.inbox = inbox;
- }
-
- @Override
- public void run() {
- FeedReport report;
- while (true) {
- try {
- report = inbox.take();
- long currentReportedCongestionTime = System.currentTimeMillis();
- boolean closeEnough = lastReportedCongestion == 0
- || currentReportedCongestionTime - lastReportedCongestion < closeEnoughTimeBound;
- if (lastMaxReport == null) {
- lastMaxReport = report;
- if (closeEnough) {
- congestionCount++;
- }
- } else {
- if (report.compareTo(lastMaxReport) > 0) {
- lastMaxReport = report;
- congestionCount = 1;
- } else if (report.compareTo(lastMaxReport) == 0) {
- lastMaxReport = report;
- if (closeEnough) {
- congestionCount++;
- if (congestionCount > 5) {
- FeedRuntimeType sourceOfCongestion = null;
- switch (lastMaxReport.getRuntimeType()) {
- case INGESTION:
- sourceOfCongestion = FeedRuntimeType.COMPUTE;
- break;
- case COMPUTE:
- sourceOfCongestion = FeedRuntimeType.STORAGE;
- break;
- case STORAGE:
- case COMMIT:
- sourceOfCongestion = FeedRuntimeType.COMMIT;
- break;
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(" Need elasticity at " + sourceOfCongestion
- + " as per report " + lastMaxReport);
- }
- }
- }
- }
- }
- lastReportedCongestion = System.currentTimeMillis();
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- }
}
-}
\ No newline at end of file
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
index 703aacb..6d0b1f9 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
@@ -16,6 +16,10 @@
public static final String KEY_DURATION = "duration";
public static final String KEY_TPS = "tps";
+ public static final String KEY_MIN_TPS = "tps-min";
+ public static final String KEY_MAX_TPS = "tps-max";
+
+
public static final String KEY_TPUT_DURATION = "tput-duration";
public static final String OUTPUT_FORMAT = "output-format";
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index cc1c597..5739c70 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -8,6 +8,7 @@
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Map;
+import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
@@ -25,7 +26,6 @@
/**
* TPS can be configured between 1 and 20,000
*
- * @author ramang
*/
public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
@@ -134,7 +134,9 @@
private Socket socket;
private TweetGenerator2 tweetGenerator;
private boolean continuePush = true;
- private int tps;
+ private int fixedTps = -1;
+ private int minTps = -1;
+ private int maxTps = -1;
private int tputDuration;
private int partition;
private Rate task;
@@ -144,7 +146,7 @@
public static enum Mode {
AGGRESSIVE,
- CONTROLLED
+ CONTROLLED,
}
public void setPartition(int partition) {
@@ -156,16 +158,42 @@
String datasetName) throws Exception {
this.serverSocket = serverSocket;
this.tweetGenerator = new TweetGenerator2(configuration, 0, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
- tps = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPS));
- tputDuration = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPUT_DURATION));
- task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
String value = configuration.get(KEY_MODE);
+ String confValue = null;
if (value != null) {
mode = Mode.valueOf(value.toUpperCase());
+ switch (mode) {
+ case AGGRESSIVE:
+ break;
+ case CONTROLLED:
+ confValue = configuration.get(TweetGenerator2.KEY_TPS);
+ if (confValue != null) {
+ minTps = Integer.parseInt(confValue);
+ maxTps = minTps;
+ fixedTps = minTps;
+ } else {
+ confValue = configuration.get(TweetGenerator2.KEY_MIN_TPS);
+ if (confValue != null) {
+ minTps = Integer.parseInt(confValue);
+ }
+ confValue = configuration.get(TweetGenerator2.KEY_MAX_TPS);
+ if (confValue != null) {
+ maxTps = Integer.parseInt(configuration.get(TweetGenerator2.KEY_MAX_TPS));
+ }
+
+ if (minTps < 0 || maxTps < 0 || minTps > maxTps) {
+ throw new IllegalArgumentException("Incorrect value for min/max TPS");
+ }
+ }
+
+ }
} else {
mode = Mode.AGGRESSIVE;
}
+ tputDuration = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPUT_DURATION));
+ task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
+
}
@Override
@@ -180,7 +208,14 @@
timer.schedule(task, tputDuration * 1000, tputDuration * 1000);
long startBatch;
long endBatch;
+ Random random = new Random();
+ int tps = 0;
while (moreData && continuePush) {
+ if(maxTps > 0){
+ tps = minTps + random.nextInt((maxTps+1) - minTps);
+ } else {
+ tps = fixedTps;
+ }
startBatch = System.currentTimeMillis();
moreData = tweetGenerator.setNextRecordBatch(tps);
endBatch = System.currentTimeMillis();
@@ -253,9 +288,7 @@
public void setPartition(int partition) {
this.partition = partition;
}
-
}
-
}
@Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 241ea6c..1d8a607 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -14,7 +14,10 @@
*/
package edu.uci.ics.asterix.tools.external.data;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
@@ -24,6 +27,8 @@
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -46,9 +51,15 @@
/*
* Degree of parallelism for feed ingestion activity. Defaults to 1.
+ * This builds up the count constraint for the ingestion operator.
*/
private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+ /*
+ * The absolute locations where ingestion operator instances will be places.
+ */
+ private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
private static final ARecordType outputType = initOutputType();
@Override
@@ -76,8 +87,23 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
String ingestionCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
- int requiredCardinality = ingestionCardinalityParam != null ? Integer.parseInt(ingestionCardinalityParam) : 1;
- return new AlgebricksCountPartitionConstraint(requiredCardinality);
+ String ingestionLocationParam = (String) configuration.get(KEY_INGESTION_LOCATIONS);
+ String[] locations = null;
+ if (ingestionLocationParam != null) {
+ locations = ingestionLocationParam.split(",");
+ }
+ int count = locations != null ? locations.length : 1;
+ if (ingestionCardinalityParam != null) {
+ count = Integer.parseInt(ingestionCardinalityParam);
+ }
+
+ List<String> chosenLocations = new ArrayList<String>();
+ String[] availableLocations = locations != null ? locations : AsterixClusterProperties.INSTANCE
+ .getParticipantNodes().toArray(new String[] {});
+ for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+ chosenLocations.add(availableLocations[k]);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
}
@Override