1) minor fix in feed console 2) fixed bug incorrect evaluation of feed adapter output type
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index a0cf4f9..ed97117 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -30,10 +30,13 @@
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -132,7 +135,7 @@
String feedArg = getStringArgument(f, 0);
String outputType = getStringArgument(f, 1);
- String targetDataset = getStringArgument(f, 2);
+ String targetDataset = getStringArgument(f, 1);
AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
Pair<String, String> feedReference = parseDatasetReference(metadataProvider, feedArg);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index 3721147..71d57d7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -16,6 +16,8 @@
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,6 +30,11 @@
import org.json.JSONObject;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
public class FeedDataProviderServlet extends HttpServlet {
@@ -41,16 +48,22 @@
String dataverseName = request.getParameter("dataverse");
String report = getFeedReport(feedName, datasetName, dataverseName);
- System.out.println(" RECEIVED REPORT " + report);
+ System.out.println(" REPORT " + report);
long timestamp = System.currentTimeMillis();
-
- JSONObject obj = new JSONObject();
- JSONArray array = new JSONArray();
- try {
- obj.put("time", timestamp);
- obj.put("value", report);
- } catch (JSONException jsoe) {
- throw new IOException(jsoe);
+ JSONObject obj = null;
+ if (report != null) {
+ JSONArray array = new JSONArray();
+ try {
+ obj = new JSONObject();
+ obj.put("type", "report");
+ obj.put("time", timestamp);
+ obj.put("value", report);
+ } catch (JSONException jsoe) {
+ throw new IOException(jsoe);
+ }
+ } else {
+ obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
+ // do null check
}
PrintWriter out = response.getWriter();
@@ -68,4 +81,42 @@
}
return report;
}
+
+ private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
+ JSONObject obj = null;
+ try {
+ MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+ List<FeedActivity> feedActivities = MetadataManager.INSTANCE
+ .getActiveFeeds(ctx, dataverseName, datasetName);
+ boolean active = false;
+ FeedActivity activity = null;
+ for (FeedActivity feedActivity : feedActivities) {
+ active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
+ FeedActivityType.FEED_BEGIN));
+ if (active) {
+ activity = feedActivity;
+ break;
+ }
+ }
+ if (active) {
+ Map<String, String> activityDetails = activity.getFeedActivityDetails();
+ String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+ String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+ obj = new JSONObject();
+ obj.put("type", "reload");
+ obj.put("ingestLocations", ingestLocations);
+ obj.put("computeLocations", computeLocations);
+ obj.put("storageLocations", storageLocations);
+ System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+ + computeLocations + " storage at " + storageLocations);
+ } else {
+ obj = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return obj;
+
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 6b79958..94eefb6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -73,7 +73,6 @@
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.FeedOperations;
-import edu.uci.ics.asterix.file.FeedUtil;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
@@ -98,6 +97,7 @@
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index aa25bd8..365f2f5 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -36,6 +36,13 @@
function onFeedReportReceived(data) {
+ var type = data["type"];
+ if (type == ("reload")) {
+ ingestLocations = data["ingestLocations"];
+ computeLocations = data["computeLocations"];
+ storageLocations = data["storageLocations"];
+ document.location.reload(true);
+ } else {
var report = data["value"];
var tputArray = report.split("|");
var covered = 0;
@@ -49,6 +56,7 @@
ingestionTimeSeries[j].append(data["time"], 0);
}
ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+ }
}
function myYRangeFunction(range) {
@@ -60,7 +68,7 @@
function initTimeline(ingestLocations) {
document.write("<i>" + "Feed Ingestion" + "<i>");
- document.write("<br />" + "Ingestion Locations: " + ingestLocations);
+ document.write("<br />" + "Ingestion Locations: " + computeLocations);
document.write("<br />" + "Compute Locations: " + computeLocations);
document.write("<br />" + "Storage Locations: " + storageLocations);
document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
@@ -69,7 +77,7 @@
document.write("<br />");
for( var i = 0; i < numIngestionNodes; i++){
- graphNames[i] = ingestionNodes[i];
+ graphNames[i] = "Partition " + i;
}
if(numIngestionNodes > 1){
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 3ee53a6..ec8b6be 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,6 +32,10 @@
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
public class ConnectFeedStatement implements Statement {
@@ -74,41 +78,41 @@
query = new Query();
FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
Function function = null;
- String adaptorOutputType = null;
+ String adapterOutputType = null;
if (appliedFunction != null) {
function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
if (function == null) {
throw new MetadataException(" Unknown function " + function);
- } else {
- if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
- adaptorOutputType = targetDataset.getItemTypeName();
- } else {
- if (function.getParams().size() > 1) {
- throw new MetadataException(" Incompatible function: " + appliedFunction
- + " Number if arguments must be 1");
- }
- adaptorOutputType = function.getParams().get(0);
- }
+ } else if (function.getParams().size() > 1) {
+ throw new MetadataException(" Incompatible function: " + appliedFunction
+ + " Number if arguments must be 1");
}
- } else {
- adaptorOutputType = targetDataset.getItemTypeName();
}
+
+ org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+ try {
+ factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+ adapterOutputType = factoryOutput.getRight().getTypeName();
+ } catch (AlgebricksException ae) {
+ throw new MetadataException(ae);
+ }
+
StringBuilder builder = new StringBuilder();
builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
builder.append("insert into dataset " + datasetName + " ");
if (appliedFunction == null) {
- builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType + "'"
+ builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType + "'"
+ "," + "'" + targetDataset.getDatasetName() + "'" + ")");
builder.append(" return $x");
} else {
if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
String param = function.getParams().get(0);
builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
- + adaptorOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+ + adapterOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
} else {
- builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType
+ builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType
+ "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
+ ")");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 7dd2c3f..8a7f6e1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1397,7 +1397,7 @@
switch (fa.getActivityType()) {
case FEED_BEGIN:
if (!terminatedFeeds.contains(fid)) {
- if (aFeeds.get(fid) == null) {
+ if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) {
aFeeds.put(fid, fa);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 89368e7..5af996f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -19,10 +19,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -65,10 +63,10 @@
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
@@ -169,7 +167,7 @@
private final AsterixStorageProperties storageProperties;
- private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+ public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -412,65 +410,30 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
- DatasourceAdapter adapterEntity;
- IAdapterFactory adapterFactory;
- IAType adapterOutputType;
- String adapterName;
- String adapterFactoryClassname;
FeedDataSource feedDataSource = (FeedDataSource) dataSource;
- try {
- adapterName = feedDataSource.getFeed().getAdaptorName();
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname != null) {
- } else {
- adapterFactoryClassname = adapterName;
- }
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
-
- Map<String, String> configuration = feedDataSource.getFeed().getAdaptorConfiguration();
-
- switch (adapterFactory.getAdapterType()) {
- case TYPED:
- adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
- ((ITypedAdapterFactory) adapterFactory).configure(configuration);
- break;
- case GENERIC:
- String outputTypeName = configuration.get("output-type-name");
- adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- feedDataSource.getDatasourceDataverse(), outputTypeName).getDatatype();
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
- break;
- default:
- throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to create adapter " + e);
- }
-
- ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
- .getSerializerDeserializer(adapterOutputType);
- RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
- BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-
- feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
- feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
- feedDesc, feedPolicy.getProperties());
-
+ FeedIntakeOperatorDescriptor feedIngestor = null;
+ org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
AlgebricksPartitionConstraint constraint = null;
+
try {
- constraint = adapterFactory.getPartitionConstraint();
+ factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
+ IAdapterFactory adapterFactory = factoryOutput.getLeft();
+ ARecordType adapterOutputType = factoryOutput.getRight();
+
+ ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+ .getSerializerDeserializer(adapterOutputType);
+ RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+ BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+
+ feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+ feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+ .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
+ feedDesc, feedPolicy.getProperties());
+
+ constraint = factoryOutput.getLeft().getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 698e2ab..c06f884 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -6,8 +6,6 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -27,32 +25,18 @@
private IOperatorNodePushable nodePushable;
- private FeedPolicyEnforcer policyEnforcer;
-
- private FeedConnectionId feedId;
-
- private LinkedBlockingQueue<Long> statsOutbox;
-
private final boolean collectStatistics;
private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
private Mode mode;
- private String nodeId;
-
public static final long FLUSH_THRESHOLD_TIME = 5000;
private FramePushWait framePushWait;
- private FeedRuntimeType feedRuntimeType;
-
- private int partition;
-
private Timer timer;
- private ExecutorService executorService;
-
private FrameTupleAccessor fta;
public enum Mode {
@@ -66,14 +50,8 @@
this.writer = writer;
this.mode = Mode.FORWARD;
this.nodePushable = nodePushable;
- this.feedId = feedId;
- this.policyEnforcer = policyEnforcer;
- this.feedRuntimeType = feedRuntimeType;
- this.partition = partition;
- this.executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
if (collectStatistics) {
- this.statsOutbox = new LinkedBlockingQueue<Long>();
timer = new Timer();
framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
partition, FLUSH_THRESHOLD_TIME, timer);
@@ -94,19 +72,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Switching to :" + newMode + " from " + this.mode);
}
- switch (newMode) {
- case FORWARD:
- this.mode = newMode;
- break;
- case STORE:
- this.mode = newMode;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning to store frames :");
- LOGGER.info("Frames accumulated till now:" + frames.size());
- }
- break;
- }
-
+ this.mode = newMode;
}
public List<ByteBuffer> getStoredFrames() {
@@ -201,6 +167,7 @@
public void reset() {
mesgService = null;
+ collectThroughput = true;
}
public void notifyFinish(int numTuples) {
@@ -220,7 +187,8 @@
}
}
if (collectThroughput) {
- System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in " + period);
+ System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in " + period
+ + " for partition " + partition);
int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
}
@@ -247,6 +215,7 @@
try {
mesgService.sendMessage(message);
} catch (IOException ioe) {
+ ioe.printStackTrace();
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Unable to send feed report to SFM for feed " + feedId + " " + feedRuntimeType + "["
+ partition + "]");
@@ -254,6 +223,14 @@
}
}
+ public void deactivate() {
+ collectThroughput = false;
+ }
+
+ public void activate() {
+ collectThroughput = true;
+ }
+
private enum State {
INTIALIZED,
WAITING_FOR_FLUSH_COMPLETION,
@@ -262,99 +239,23 @@
}
- private static class FeedOperatorStatisticsCollector implements Runnable {
-
- private final FeedConnectionId feedId;
- private final LinkedBlockingQueue<Long> inbox;
- private final long[] readings;
- private int readingIndex = 0;
- private int historySize = 10;
- private double runningAvg = -1;
- private double deviationPercentageThreshold = 50;
- private int successiveThresholds = 0;
- private IOperatorNodePushable coreOperatorNodePushable;
- private int count;
-
- public FeedOperatorStatisticsCollector(FeedConnectionId feedId, LinkedBlockingQueue<Long> inbox,
- IOperatorNodePushable coreOperatorNodePushable) {
- this.feedId = feedId;
- this.inbox = inbox;
- this.readings = new long[historySize];
- this.coreOperatorNodePushable = coreOperatorNodePushable;
- }
-
- @Override
- public void run() {
- SuperFeedManager sfm = null;
- try {
- while (sfm == null) {
- sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
- if (sfm == null) {
- Thread.sleep(2000);
- }
- }
-
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Obtained SFM " + sfm + " " + coreOperatorNodePushable.getDisplayName());
- }
- while (true) {
- Long reading = inbox.take();
- if (count != historySize) {
- count++;
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Obtained Reading " + reading + " " + coreOperatorNodePushable.getDisplayName());
- }
- double newRunningAvg;
- double deviation = 0;
- if (runningAvg >= 0) {
- int prevIndex = readingIndex == 0 ? historySize - 1 : readingIndex - 1;
- newRunningAvg = (runningAvg * count - readings[prevIndex] + reading) / (count);
- deviation = reading - runningAvg;
- } else {
- newRunningAvg = reading;
- }
-
- double devPercentage = (deviation * 100 / runningAvg);
-
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Current reading :" + reading + " Previous avg:" + runningAvg + " New Average: "
- + newRunningAvg + " deviation % " + devPercentage + " Op "
- + coreOperatorNodePushable.getDisplayName());
- }
-
- if (devPercentage > deviationPercentageThreshold) {
- successiveThresholds++;
- if (successiveThresholds > 1) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("CONGESTION in sending frames by "
- + coreOperatorNodePushable.getDisplayName());
- }
- successiveThresholds = 0;
- }
- } else {
- runningAvg = newRunningAvg;
- readings[readingIndex] = reading;
- readingIndex = (readingIndex + 1) % historySize;
- }
- }
- } catch (InterruptedException ie) {
- // do nothing
- }
- }
- }
-
@Override
public void fail() throws HyracksDataException {
writer.fail();
- if (framePushWait != null) {
+ if (framePushWait != null && !framePushWait.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
framePushWait.cancel();
+ framePushWait.deactivate();
}
+ framePushWait.reset();
}
@Override
public void close() throws HyracksDataException {
if (framePushWait != null) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closing frame statistics collection activity" + framePushWait);
+ }
+ framePushWait.deactivate();
framePushWait.cancel();
}
writer.close();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index ca73324..5ed4ccc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
@@ -21,6 +22,7 @@
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -42,6 +44,7 @@
private FeedRuntime ingestionRuntime;
private final String nodeId;
private FrameTupleAccessor fta;
+ private FeedFrameWriter feedFrameWriter;
public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
@@ -63,9 +66,9 @@
AdapterRuntimeManager adapterRuntimeMgr = null;
try {
if (ingestionRuntime == null) {
- FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
+ feedFrameWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
FeedRuntimeType.INGESTION, partition, fta);
- adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
+ adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox);
if (adapter instanceof AbstractFeedDatasourceAdapter) {
((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
@@ -73,7 +76,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
}
- mWriter.open();
+ feedFrameWriter.open();
adapterRuntimeMgr.start();
} else {
adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
@@ -85,6 +88,7 @@
adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
+ feedFrameWriter = adapterRuntimeMgr.getAdapterExecutor().getWriter();
}
ingestionRuntime = adapterRuntimeMgr.getIngestionRuntime();
@@ -94,22 +98,30 @@
}
}
FeedManager.INSTANCE.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
+ feedFrameWriter.close();
} catch (InterruptedException ie) {
if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Continuing on failure as per feed policy");
}
adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
- writer.fail();
+ FeedRuntimeManager runtimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
+ try {
+ runtimeMgr.close(false);
+ } catch (IOException ioe) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to close Feed Runtime Manager " + ioe.getMessage());
+ }
+ }
+ feedFrameWriter.fail();
} else {
FeedManager.INSTANCE.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
+ feedFrameWriter.close();
throw new HyracksDataException(ie);
}
} catch (Exception e) {
e.printStackTrace();
throw new HyracksDataException(e);
- } finally {
- writer.close();
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 83060f2..d910ca8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -62,7 +62,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Closing feed runtime manager: " + mgr);
}
- mgr.close();
+ mgr.close(true);
}
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
@@ -98,6 +98,9 @@
FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
if (runtimeMgr != null) {
runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deregistered Feed Runtime " + feedRuntimeId);
+ }
}
}
@@ -112,6 +115,9 @@
FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
if (runtimeMgr != null) {
runtimeMgr.setSuperFeedManager(sfm);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered Super Feed Manager " + sfm);
+ }
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 8838ad4..a8541f5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -17,7 +17,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
index c93bb5a..2e31f6d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -61,6 +61,9 @@
LOGGER.warning("Unable to start feed message service for " + feedId);
}
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("ENDED FEED MESSAGE SERVICE for " + feedId);
+ }
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
index 3b2eaaf..b4951ec 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
@@ -38,7 +38,7 @@
feedReportQueue = new LinkedBlockingQueue<String>();
}
- public void close() throws IOException {
+ public void close(boolean closeAll) throws IOException {
socketFactory.close();
if (messageService != null) {
@@ -46,6 +46,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Shut down message service s for :" + feedId);
}
+ messageService = null;
}
if (superFeedManager != null && superFeedManager.isLocal()) {
superFeedManager.stop();
@@ -54,10 +55,12 @@
}
}
- if (executorService != null) {
- executorService.shutdownNow();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + feedId);
+ if (closeAll) {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down executor service for :" + feedId);
+ }
}
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
similarity index 74%
rename from asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index a09ca3c..e6bb81d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.file;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.ArrayList;
import java.util.HashMap;
@@ -11,14 +11,20 @@
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -171,4 +177,53 @@
}
+ public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+
+ String adapterName = null;
+ DatasourceAdapter adapterEntity = null;
+ String adapterFactoryClassname = null;
+ IAdapterFactory adapterFactory = null;
+ ARecordType adapterOutputType = null;
+ Pair<IAdapterFactory, ARecordType> feedProps = null;
+ try {
+ adapterName = feed.getAdaptorName();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname != null) {
+ } else {
+ adapterFactoryClassname = adapterName;
+ }
+ adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
+
+ Map<String, String> configuration = feed.getAdaptorConfiguration();
+
+ switch (adapterFactory.getAdapterType()) {
+ case TYPED:
+ adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+ ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+ break;
+ case GENERIC:
+ String outputTypeName = configuration.get("output-type-name");
+ adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+ feed.getDataverseName(), outputTypeName).getDatatype();
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
+ break;
+ default:
+ throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+ }
+
+ feedProps = Pair.of(adapterFactory, adapterOutputType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to create adapter " + e);
+ }
+ return feedProps;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
index 3be39f3..20a6b0e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
@@ -28,11 +28,11 @@
}
public void stop() {
+ listenerServer.stop();
+ System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
- listenerServer.stop();
- System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 4d15b8e..18177c2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -120,6 +120,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopped super feed manager! " + this);
}
+ started = false;
+ }
+
+ public boolean isStarted() {
+ return started;
}
@Override
@@ -212,6 +217,9 @@
String message = inbox.take();
os.write((message + EOM).getBytes());
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unsubscribed from " + feedId + " disconnected");
+ }
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
@@ -265,6 +273,9 @@
e.printStackTrace();
}
}
+ for (MessageListener listener : messageListeners) {
+ listener.stop();
+ }
mesgAnalyzer.stop();
}
@@ -348,8 +359,11 @@
String[] comp = message.split("\\|");
String parition = comp[3];
String tput = comp[4];
+ String location = comp[5];
ingestionThroughputs.put(parition, tput);
- for (String tp : ingestionThroughputs.values()) {
+ // throughput in partition order
+ for (int i = 0; i < ingestionThroughputs.size(); i++) {
+ String tp = ingestionThroughputs.get(i + "");
finalMessage.append(tp + "|");
}
q.add(finalMessage.toString());