Add push-based channels and improve broker notifications
Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index 41853b9..3df9a76 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -18,51 +18,16 @@
*/
package org.apache.asterix.bad;
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
- * Provides functionality for channel jobs and communicating with Brokers
+ * Provides functionality for channel jobs
*/
public class ChannelJobService {
private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
- public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
- AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
- String formattedString;
- formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
- sendMessage(brokerEndpoint, formattedString);
- }
-
- public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
- String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
- + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
- + channelExecutionTime + "\", \"subscriptionIds\":[";
- for (int i = 0; i < subscriptionIds.size(); i++) {
- AUUID subId = (AUUID) subscriptionIds.getItem(i);
- String subscriptionString = subId.toString();
- //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
- subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
- JSON += "\"" + subscriptionString + "\"";
- if (i < subscriptionIds.size() - 1) {
- JSON += ",";
- }
- }
- JSON += "]}";
- return JSON;
-
- }
public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
@@ -92,61 +57,6 @@
return (long) (seconds * 1000);
}
- public static void sendMessage(String targetURL, String urlParameters) {
- HttpURLConnection connection = null;
- try {
- //Create connection
- URL url = new URL(targetURL);
- connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
-
- connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
- connection.setRequestProperty("Content-Language", "en-US");
-
- connection.setUseCaches(false);
- connection.setDoOutput(true);
- connection.setConnectTimeout(500);
-
- if (connection.getOutputStream() != null) {
- //Send message
- DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
- wr.writeBytes(urlParameters);
- wr.close();
- } else {
- throw new Exception();
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- int responseCode = connection.getResponseCode();
- LOGGER.info("\nSending 'POST' request to URL : " + url);
- LOGGER.info("Post parameters : " + urlParameters);
- LOGGER.info("Response Code : " + responseCode);
- }
-
- if (connection.getInputStream() != null) {
- BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
- String inputLine;
- StringBuffer response = new StringBuffer();
- while ((inputLine = in.readLine()) != null) {
- response.append(inputLine);
- }
- in.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, response.toString());
- }
- } else {
- LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
- }
-
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
- } finally {
- if (connection != null) {
- connection.disconnect();
- }
- }
- }
@Override
public String toString() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 161f093..87ac320 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -40,7 +40,6 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -48,6 +47,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -59,6 +59,7 @@
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,14 +93,16 @@
private String subscriptionsTableName;
private String resultsTableName;
private String dataverse;
+ private final boolean push;
public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
- Expression period) {
+ Expression period, boolean push) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.function = function;
this.period = (CallExpr) period;
this.duration = "";
+ this.push = push;
}
public Identifier getDataverseName() {
@@ -218,12 +221,37 @@
}
+ private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, Query q) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = null;
+ try {
+ jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, e.getMessage(), e);
+ if (bActiveTxn) {
+ ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ return jobSpec;
+ }
+
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
- builder.append("insert into " + dataverse + "." + resultsTableName);
- builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
+ if (!push) {
+ builder.append("insert into " + dataverse + "." + resultsTableName);
+ builder.append(" as a (\n");
+ }
+ builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
@@ -238,15 +266,19 @@
builder.append("sub.param" + i + ") result \n");
builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
- builder.append(")");
- builder.append(" returning a");
+ if (!push) {
+ builder.append(")");
+ builder.append(" returning a");
+ }
builder.append(";");
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
-
+ if (push) {
+ return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+ }
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index d83b606..9ead7f0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,6 +31,7 @@
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,6 +44,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -74,133 +76,197 @@
if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
+ boolean push = false;
+
AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
- return false;
+ if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+ return false;
+ }
+ push = true;
}
- DelegateOperator eOp = (DelegateOperator) op;
- if (!(eOp.getDelegate() instanceof CommitOperator)) {
- return false;
- }
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
- if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
- return false;
- }
- InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
- if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
- return false;
- }
- DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
- String datasetName = dds.getDataset().getDatasetName();
- if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
- || !datasetName.endsWith("Results")) {
- return false;
- }
- String channelDataverse = dds.getDataset().getDataverseName();
- //Now we know that we are inserting into results
+ DataSourceScanOperator subscriptionsScan;
+ String channelDataverse;
+ String channelName;
- String channelName = datasetName.substring(0, datasetName.length() - 7);
- String subscriptionsName = channelName + "Subscriptions";
- //TODO: Can we check here to see if there is a channel with such a name?
+ if (!push) {
+ DelegateOperator eOp = (DelegateOperator) op;
+ if (!(eOp.getDelegate() instanceof CommitOperator)) {
+ return false;
+ }
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+ return false;
+ }
+ InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+ if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+ return false;
+ }
+ DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+ String datasetName = dds.getDataset().getDatasetName();
+ if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+ || !datasetName.endsWith("Results")) {
+ return false;
+ }
+ channelDataverse = dds.getDataset().getDataverseName();
+ //Now we know that we are inserting into results
- DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
- if (subscriptionsScan == null) {
- return false;
+ channelName = datasetName.substring(0, datasetName.length() - 7);
+ String subscriptionsName = channelName + "Subscriptions";
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+ if (subscriptionsScan == null) {
+ return false;
+ }
+
+ } else {
+ //if push, get the channel name here instead
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+ if (subscriptionsScan == null) {
+ return false;
+ }
+ DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource();
+ String datasetName = dds.getDataset().getDatasetName();
+ channelDataverse = dds.getDataset().getDataverseName();
+ channelName = datasetName.substring(0, datasetName.length() - 13);
}
- //Now we want to make sure and set the commit to be a nonsink commit
- ((CommitOperator) eOp.getDelegate()).setSink(false);
-
- //Now we need to get the broker EndPoint
+ //Now we need to get the broker EndPoint
LogicalVariable brokerEndpointVar = context.newVar();
AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
- AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
- //now brokerNameVar holds the brokerName for use farther up in the plan
-
- context.computeAndSetTypeEnvironmentForOperator(assignOp);
- context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
- context.computeAndSetTypeEnvironmentForOperator(eOp);
+ if (opAboveBrokersScan == null) {
+ return false;
+ }
//get subscriptionIdVar
LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
//The channelExecutionTime is created just before the scan
- LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
- .getVariables().get(0);
+ ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
+ if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ return false;
+ }
+ LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0);
+ if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) {
+ return false;
+ }
- ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+ if (!push) {
+ ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
+ }
+
+ AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+ //now brokerNameVar holds the brokerName for use farther up in the plan
+
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+ context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+ context.computeAndSetTypeEnvironmentForOperator(op);
+
+ ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
badProject.getVariables().add(subscriptionIdVar);
badProject.getVariables().add(brokerEndpointVar);
badProject.getVariables().add(channelExecutionVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
+
//Create my brokerNotify plan above the extension Operator
- DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
- context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+ DelegateOperator dOp = push
+ ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
+ context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
+ : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+ (DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
return true;
}
- private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
- LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
- ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
- throws AlgebricksException {
- //create the Distinct Op
- ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
- VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
- expressions.add(new MutableObject<ILogicalExpression>(vExpr));
- DistinctOperator distinctOp = new DistinctOperator(expressions);
-
- //create the GroupBy Op
- //And set the distinct as input
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
-
- //create group by operator
- GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
- groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
- groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
- groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
-
- //create nested plan for subscription ids in group by
- NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
- new MutableObject<ILogicalOperator>(groupbyOp));
- //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
- //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
- LogicalVariable subscriptionListVar = context.newVar();
- List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
- aggVars.add(subscriptionListVar);
- AggregateFunctionCallExpression funAgg = BuiltinFunctions.makeAggregateFunctionExpression(
- BuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
- funAgg.getArguments()
- .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
- List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
- AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
- listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
-
- //add nested plans
- nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
-
- //Create the NotifyBrokerOperator
- NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
- channelExecutionVar);
+ private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
+ LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
+ IAType resultType) {
+ NotifyBrokerOperator notifyBrokerOp =
+ new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType);
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
extensionOp.setPhysicalOperator(notifyBrokerPOp);
- extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+ return extensionOp;
+ }
- //Set the input for the brokerNotify as the replicate operator
- distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+ private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
+ LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
+ DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ throws AlgebricksException {
+ //Find the assign operator to get the result type that we need
+ AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue();
+ }
+ IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context);
+ IAType resultType = (IAType) env.getVarType(sendVar);
+
+ //Create the NotifyBrokerOperator
+ DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse,
+ channelName, true, resultType);
+
+ extensionOp.getInputs().add(new MutableObject<>(eOp));
+ context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+ return extensionOp;
+
+ }
+
+ private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar,
+ LogicalVariable sendVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+ ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ throws AlgebricksException {
+
+ //Create the Distinct Op
+ ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
+ VariableReferenceExpression vExpr = new VariableReferenceExpression(sendVar);
+ expressions.add(new MutableObject<>(vExpr));
+ DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>();
+ List<ILogicalPlan> nestedPlans = new ArrayList<>();
+
+ //Create GroupBy operator
+ GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+
+ //Set the distinct as input
+ groupbyOp.getInputs().add(new MutableObject<>(distinctOp));
+
+ //create nested plan for subscription ids in group by
+ NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(new MutableObject<>(groupbyOp));
+ LogicalVariable sendListVar = context.newVar();
+ List<LogicalVariable> aggVars = new ArrayList<>();
+ aggVars.add(sendListVar);
+ AggregateFunctionCallExpression funAgg =
+ BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new ArrayList<>());
+ funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(sendVar)));
+ List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<>();
+ aggExpressions.add(new MutableObject<>(funAgg));
+ AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+ listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp));
+
+ //add nested plans
+ nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp)));
+
+
+ //Create the NotifyBrokerOperator
+ DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar,
+ channelDataverse, channelName, false, null);
+
+ //Set the input for the distinct as the old top
+ extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
+ distinctOp.getInputs().add(new MutableObject<>(eOp));
//compute environment bottom up
-
context.computeAndSetTypeEnvironmentForOperator(distinctOp);
context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
@@ -211,7 +277,6 @@
}
- @SuppressWarnings("unchecked")
private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
AbstractLogicalOperator opAboveBrokersScan) {
Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
@@ -244,9 +309,10 @@
return assignOp;
}
- /*This function searches for the needed op
- * If lookingForBrokers, find the op above the brokers scan
- * Else find the suscbriptionsScan
+ /*This function is used to find specific operators within the plan, either
+ * A. The brokers dataset scan
+ * B. The subscriptions scan
+ * C. The highest project of the plan
*/
private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
if (!op.hasInputs()) {
@@ -311,7 +377,7 @@
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
&& dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
- if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+ if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index d281b49..df0f0f4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -20,6 +20,7 @@
import java.util.Collection;
+import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
@@ -27,22 +28,26 @@
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
/**
- * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ * An operator for sending broker notifications
*/
public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
- private final LogicalVariable subscriptionIdVar;
private final LogicalVariable brokerEndpointVar;
private final LogicalVariable channelExecutionVar;
+ private final LogicalVariable pushListVar;
+ private final boolean push;
+ private final IAType recordType;
- public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
- LogicalVariable resultSetVar) {
+ public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar,
+ LogicalVariable resultSetVar, boolean push, IAType recordType) {
this.brokerEndpointVar = brokerEndpointVar;
- this.subscriptionIdVar = subscriptionIdVar;
this.channelExecutionVar = resultSetVar;
+ this.pushListVar = pushListVar;
+ this.push = push;
+ this.recordType = recordType;
}
- public LogicalVariable getSubscriptionVariable() {
- return subscriptionIdVar;
+ public LogicalVariable getPushListVar() {
+ return pushListVar;
}
public LogicalVariable getBrokerEndpointVariable() {
@@ -53,9 +58,18 @@
return channelExecutionVar;
}
+ public IAType getRecordType() {
+ return recordType;
+ }
+
+ public boolean getPush() {
+ return push;
+ }
+
@Override
public String toString() {
- return "notify-brokers";
+ return "notify-brokers (" + brokerEndpointVar.toString() + "," + channelExecutionVar.toString() + ","
+ + pushListVar.toString() + ")";
}
@Override
@@ -65,7 +79,7 @@
@Override
public IOperatorDelegate newInstance() {
- return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+ return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType);
}
@Override
@@ -76,7 +90,7 @@
@Override
public void getUsedVariables(Collection<LogicalVariable> usedVars) {
- usedVars.add(subscriptionIdVar);
+ usedVars.add(pushListVar);
usedVars.add(brokerEndpointVar);
usedVars.add(channelExecutionVar);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 12d5ae2..b9cfbfd 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -20,6 +20,7 @@
package org.apache.asterix.bad.runtime;
import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -74,20 +75,22 @@
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
DelegateOperator notify = (DelegateOperator) op;
- LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+ LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar();
LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+ IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType();
+ boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
int brokerColumn = inputSchemas[0].findVariable(brokerVar);
- int subColumn = inputSchemas[0].findVariable(subVar);
+ int pushColumn = inputSchemas[0].findVariable(pushListVar);
int executionColumn = inputSchemas[0].findVariable(executionVar);
IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
- IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+ IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn);
IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
- NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
- channelExecutionEvalFactory, entityId);
+ NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
+ channelExecutionEvalFactory, entityId, push, recordType);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
context);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 5d51926..6ffb244 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -20,21 +20,33 @@
package org.apache.asterix.bad.runtime;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -48,11 +60,13 @@
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
private final AOrderedListSerializerDeserializer subSerDes =
new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
+ private final ARecordSerializerDeserializer recordSerDes;
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -62,17 +76,29 @@
private IScalarEvaluator eval2;
private final ActiveManager activeManager;
private final EntityId entityId;
+ private final boolean push;
+ private AOrderedList pushList;
+ private ARecord pushRecord;
+ private final IAType recordType;
+ private final Map<String, HashSet<String>> sendData = new HashMap<>();
+ private String executionTimeString;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
- IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
- EntityId activeJobId) throws HyracksDataException {
+ IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+ EntityId activeJobId, boolean push, IAType recordType) throws HyracksDataException {
this.tRef = new FrameTupleReference();
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
- eval1 = subEvalFactory.createScalarEvaluator(ctx);
+ eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
+ this.push = push;
+ this.pushList = null;
+ this.pushRecord = null;
+ this.recordType = recordType;
+ recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+ executionTimeString = null;
}
@Override
@@ -80,6 +106,61 @@
return;
}
+ private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
+ for (int i = 0; i < subscriptionIds.size(); i++) {
+ AUUID subId = (AUUID) subscriptionIds.getItem(i);
+ String subscriptionString = subId.toString();
+ //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
+ subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
+ subscriptionString = "\"" + subscriptionString + "\"";
+ sendData.get(endpoint).add(subscriptionString);
+ }
+ }
+
+ public String createData(String endpoint) {
+ String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ + executionTimeString + "\", \"subscriptionIds\":[";
+ for (String value : sendData.get(endpoint)) {
+ JSON += value;
+ JSON += ",";
+ }
+ JSON = JSON.substring(0, JSON.length() - 1);
+ JSON += "]}";
+ return JSON;
+
+ }
+
+ private void sendGroupOfResults(String endpoint) {
+ String urlParameters = createData(endpoint);
+ try {
+ //Create connection
+ URL url = new URL(endpoint);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+ connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+ connection.setRequestProperty("Content-Language", "en-US");
+
+ connection.setUseCaches(false);
+ connection.setDoOutput(true);
+ connection.setConnectTimeout(500);
+ DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+ wr.writeBytes(urlParameters);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ int responseCode = connection.getResponseCode();
+ LOGGER.info("\nSending 'POST' request to URL : " + url);
+ LOGGER.info("Post parameters : " + urlParameters);
+ LOGGER.info("Response Code : " + responseCode);
+ }
+ wr.close();
+ connection.disconnect();
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+ }
+ }
+
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
@@ -91,33 +172,47 @@
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);
- int serBrokerOffset = inputArg0.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
- AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
-
- int serSubOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
- AOrderedList subs = subSerDes.deserialize(di);
-
- int resultSetOffset = inputArg2.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
- ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
- String executionTimeString;
- try {
- executionTimeString = executionTime.toSimpleString();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ if (executionTimeString == null) {
+ int resultSetOffset = inputArg2.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+ ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+ try {
+ executionTimeString = executionTime.toSimpleString();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
- ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
- executionTimeString);
+ int serBrokerOffset = inputArg0.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+ String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
+ sendData.putIfAbsent(endpoint, new HashSet<>());
+ if (push) {
+ int pushOffset = inputArg1.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
+ //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
+ pushRecord = recordSerDes.deserialize(di);
+ sendData.get(endpoint).add(pushRecord.toString());
+
+ } else {
+ int serSubOffset = inputArg1.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+ pushList = subSerDes.deserialize(di);
+ addSubscriptions(endpoint, pushList);
+ }
}
}
@Override
public void close() throws HyracksDataException {
+ for (String endpoint : sendData.keySet()) {
+ if (sendData.get(endpoint).size() > 0) {
+ sendGroupOfResults(endpoint);
+ sendData.get(endpoint).clear();
+ }
+ }
return;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
index 0e2be8b..a7f12ba 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -20,6 +20,7 @@
package org.apache.asterix.bad.runtime;
import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -31,16 +32,21 @@
private static final long serialVersionUID = 1L;
private final IScalarEvaluatorFactory brokerEvalFactory;
- private final IScalarEvaluatorFactory subEvalFactory;
+ private final IScalarEvaluatorFactory pushListEvalFactory;
private final IScalarEvaluatorFactory channelExecutionEvalFactory;
private final EntityId entityId;
+ private final boolean push;
+ private final IAType recordType;
- public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
- IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+ public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory,
+ IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+ EntityId entityId, boolean push, IAType recordType) {
this.brokerEvalFactory = brokerEvalFactory;
- this.subEvalFactory = subEvalFactory;
+ this.pushListEvalFactory = pushListEvalFactory;
this.channelExecutionEvalFactory = channelExecutionEvalFactory;
this.entityId = entityId;
+ this.push = push;
+ this.recordType = recordType;
}
@Override
@@ -50,7 +56,7 @@
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
- return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory,
- channelExecutionEvalFactory, entityId) };
+ return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, pushListEvalFactory,
+ channelExecutionEvalFactory, entityId, push, recordType) };
}
}
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 02aba78..2d7ba75 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -101,15 +101,18 @@
CreateChannelStatement ccs = null;
String fqFunctionName = null;
Expression period = null;
+ boolean push = false;
}
{
(
- "repetitive" "channel" nameComponents = QualifiedName()
+ "repetitive"
+ ( "push" { push = true; } )?
+ "channel" nameComponents = QualifiedName()
<USING> appliedFunction = FunctionSignature()
"period" period = FunctionCallExpr()
{
ccs = new CreateChannelStatement(nameComponents.first,
- nameComponents.second, appliedFunction, period);
+ nameComponents.second, appliedFunction, period, push);
}
)
{