[BAD][RT] EventListener change to accommondate the new interfaces

Adapt the PrecompiledJobEventListener to follow the new interfaces.

Change-Id: I121acc01f2bb56ce2bf43f6358da9158d7c7e7f7
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 09cc3e5..7d91d2e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -21,10 +21,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -70,6 +67,7 @@
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -133,13 +131,11 @@
         return channelResultsInsertQuery;
     }
 
-    @Override
-    public byte getCategory() {
+    @Override public byte getCategory() {
         return Category.DDL;
     }
 
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+    @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return null;
     }
 
@@ -164,8 +160,7 @@
 
     }
 
-    @Override
-    public byte getKind() {
+    @Override public byte getKind() {
         return Kind.EXTENSION;
     }
 
@@ -193,13 +188,14 @@
         fieldNames.add(BADConstants.ResultId);
         partitionFields.add(fieldNames);
         idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
-        DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
-                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+        DatasetDecl createResultsDataset =
+                new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME),
+                        resultsTypeName, null, null, null, null, new HashMap<String, String>(),
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
 
         //Run both statements to create datasets
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
-                hcc);
+        ((QueryTranslator) statementExecutor)
+                .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc);
         metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
 
@@ -235,8 +231,9 @@
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
 
-        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
-                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
+        return ((QueryTranslator) statementExecutor)
+                .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null,
+                        stats, true, null, null);
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
@@ -247,15 +244,15 @@
             if (predistributed) {
                 jobId = hcc.distributeJob(channeljobSpec);
             }
-            ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
-                    jobId, hcc, ChannelJobService.findPeriod(duration));
+            ScheduledExecutorService ses = ChannelJobService
+                    .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc,
+                            ChannelJobService.findPeriod(duration));
             listener.storeDistributedInfo(jobId, ses, null);
         }
 
     }
 
-    @Override
-    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+    @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
 
@@ -310,12 +307,12 @@
                     metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
-            createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc,
-                    dataverse);
+            createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
             tempMdProvider.getLocks().reset();
             //Create Channel Internal Job
-            JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
-                    tempMdProvider, hcc, hdc, stats, dataverse);
+            JobSpecification channeljobSpec =
+                    createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
+                            dataverse);
 
             // Now we subscribe
             if (listener == null) {
@@ -323,7 +320,8 @@
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets);
+                listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
+                        "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index dfc3ed3..d203905 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -96,8 +96,7 @@
         this.period = (CallExpr) period;
     }
 
-    @Override
-    public byte getKind() {
+    @Override public byte getKind() {
         return Kind.EXTENSION;
     }
 
@@ -109,8 +108,7 @@
         return signature;
     }
 
-    @Override
-    public byte getCategory() {
+    @Override public byte getCategory() {
         return Category.DDL;
     }
 
@@ -118,8 +116,7 @@
         return period;
     }
 
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+    @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return null;
     }
 
@@ -170,10 +167,9 @@
             throw new CompilationException("Procedure can only execute a single statement");
         }
         if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
-            return new Pair<>(
-                    ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null),
-                    PrecompiledType.INSERT);
+            return new Pair<>(((QueryTranslator) statementExecutor)
+                    .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC,
+                            null, stats, true, null, null), PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
@@ -183,8 +179,8 @@
         } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             fStatements.get(0).accept(visitor, null);
-            return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
-                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+            return new Pair<>(((QueryTranslator) statementExecutor)
+                    .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true), PrecompiledType.DELETE);
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
         }
@@ -197,8 +193,7 @@
         listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
     }
 
-    @Override
-    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+    @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
@@ -217,8 +212,8 @@
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
-                    Integer.toString(signature.getArity()));
+            procedure = BADLangExtension
+                    .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity()));
             if (procedure != null) {
                 throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
             }
@@ -252,7 +247,8 @@
             // Now we subscribe
             if (listener == null) {
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
+                listener = new PrecompiledJobEventListener(appCtx, entityId, procedureJobSpec.second, new ArrayList<>(),
+                        null, "BadListener");
                 activeEventHandler.registerListener(listener);
             }
             setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 55547ea..5eb18d1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -26,8 +26,10 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.log4j.Logger;
@@ -47,9 +49,9 @@
 
     private final PrecompiledType type;
 
-    public PrecompiledJobEventListener(EntityId entityId, PrecompiledType type, List<IDataset> datasets) {
-        this.entityId = entityId;
-        this.datasets = datasets;
+    public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
+            List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
+        super(appCtx, entityId, datasets, locations, runtimeName);
         state = ActivityState.STOPPED;
         this.type = type;
     }
@@ -99,6 +101,11 @@
         }
     }
 
+    @Override
+    public void refreshStats(long l) throws HyracksDataException {
+        // no op
+    }
+
     private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
@@ -113,7 +120,7 @@
     }
 
     @Override
-    public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
-        return null;
+    public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException {
+        // no op
     }
 }