[NO ISSUE][BAD] DeployedJobEventListener and test case fix

1. The concurrent execution test case sometimes failed at result short.
The reason is the deployed job is removed before all invocations
finished. Added a sleep to the test case, also added running instance
check when dropping the procedure.
2. The DeployedJobEventListner was not registered with
ActiveNotificationHandler. Now it's registered so we can bind multiple
jobs with the event listener in the future.
3. Test cases refactored to make the overall test time shorter.
4. Add `wait-for-completion-procedure` for several test cases to make
sure the result is consistent.

Change-Id: I12ecf5c3c8f5a5c58fefa80673565c0ae3d1c9e6
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index e94b912..291697d 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -28,7 +28,8 @@
     <asterix.version>0.9.3-SNAPSHOT</asterix.version>
     <hyracks.version>0.3.3-SNAPSHOT</hyracks.version>
     <source-format.skip>true</source-format.skip>
-  </properties>
+    <testLog4jConfigFile>${root.dir}/../../asterix-app/src/test/resources/log4j2-test.xml</testLog4jConfigFile>
+    </properties>
     <build>
     <plugins>
       <plugin>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index cd60b1a..0908edb 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -246,6 +246,7 @@
     private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
             DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
             throws Exception {
+        jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
         DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
         listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId);
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index b6c66dc..7ab7f95 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,12 +18,6 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
@@ -63,8 +57,16 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
+import java.io.DataOutput;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
 public class ExecuteProcedureStatement implements IExtensionStatement {
 
+    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-procedure";
+
     private final String dataverseName;
     private final String procedureName;
     private final int arity;
@@ -118,6 +120,7 @@
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
+        JobId jobId;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
@@ -130,12 +133,18 @@
             if (procedure.getDuration().equals("")) {
 
                 //Add the Asterix Transaction Id to the map
+                long newTxId = TxnIdFactory.create().getId();
                 contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
-                        String.valueOf(TxnIdFactory.create().getId()).getBytes());
-                JobId jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
+                        String.valueOf(newTxId).getBytes());
+                jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
+
+                boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(
+                        ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
+                if (wait || listener.getType() == PrecompiledType.QUERY) {
+                    hcc.waitForCompletion(jobId);
+                }
 
                 if (listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(jobId);
                     ResultReader resultReader =
                             new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
 
@@ -150,7 +159,6 @@
                 listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
                         listener.getResultId());
             }
-
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             txnActive = false;
         } catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 1555bea..f0eaced 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -93,7 +93,13 @@
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
-        Procedure procedure = null;
+
+        if (listener.isActive()) {
+            throw new AlgebricksException("Cannot drop running procedure. There are " + listener.getRunningInstance()
+                    + " running instances.");
+        }
+
+        Procedure procedure;
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 13f9e0d..070c148 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,19 +18,12 @@
  */
 package org.apache.asterix.bad.metadata;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -42,6 +35,11 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
 public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {
 
     private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
@@ -62,11 +60,11 @@
 
     private DeployedJobSpecId deployedJobSpecId;
     private ScheduledExecutorService executorService = null;
-    private ResultReader resultReader;
     private final PrecompiledType type;
 
     private IHyracksDataset hdc;
     private ResultSetId resultSetId;
+
     // members
     protected volatile ActivityState state;
     protected JobId jobId;
@@ -79,7 +77,7 @@
     protected RequestState statsRequestState;
     protected final String runtimeName;
     protected final AlgebricksAbsolutePartitionConstraint locations;
-    protected int numRegistered;
+    private int runningInstance;
 
     public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
@@ -92,7 +90,6 @@
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
         this.locations = locations;
-        this.numRegistered = 0;
         state = ActivityState.STOPPED;
         this.type = type;
     }
@@ -110,15 +107,6 @@
         return deployedJobSpecId;
     }
 
-    protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
-            numRegistered++;
-            if (numRegistered == locations.getLocations().length) {
-                state = ActivityState.RUNNING;
-            }
-        }
-    }
-
     @Override
     public EntityId getEntityId() {
         return entityId;
@@ -182,10 +170,6 @@
         return locations;
     }
 
-    public ResultReader getResultReader() {
-        return resultReader;
-    }
-
     public PrecompiledType getType() {
         return type;
     }
@@ -234,6 +218,7 @@
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
         }
+        runningInstance++;
         state = ActivityState.RUNNING;
     }
 
@@ -241,6 +226,10 @@
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job finished for  " + entityId);
         }
+        runningInstance--;
+        if (runningInstance == 0) {
+            state = ActivityState.STOPPED;
+        }
     }
 
     @Override
@@ -266,4 +255,8 @@
     public String getDisplayName() throws HyracksDataException {
         return this.entityId.toString();
     }
+
+    public int getRunningInstance() {
+        return runningInstance;
+    }
 }
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
index bc24e9f..9701b2b 100644
--- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.logging.Logger;
 
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -61,7 +62,7 @@
     public static void setUp() throws Exception {
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
-        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME, new AsterixHyracksIntegrationUtil(), false, null);
     }
 
     @AfterClass
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
index 11b7b33..a21a4be 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
@@ -43,4 +43,4 @@
 
 create broker brokerA at "http://www.notifyA.com";
 
-create repetitive channel roomRecords using RoomOccupants@1 period duration("PT30S");
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT10S");
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
index d5f4290..c750707 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
@@ -22,4 +22,4 @@
 * Date         : Sep 2016
 * Author       : Steven Jacobs
 */
-630000
+110000
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
index cfe92c9..b9282fe 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
@@ -25,4 +25,4 @@
 
 use channels;
 
-(select value count(result) from roomRecordsResults)[0] > 19;
\ No newline at end of file
+(select value count(result) from roomRecordsResults)[0] > 9;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
new file mode 100644
index 0000000..c938d6c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : 3
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+3000
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp
similarity index 100%
rename from asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp
rename to asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp
similarity index 100%
rename from asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp
rename to asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp
index dd9e350..f72c921 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp
@@ -24,4 +24,5 @@
 */
 
 use channels;
+set `wait-for-completion-procedure` "true";
 execute deleteAll();
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
index 8d03794..7dc0661 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
@@ -24,4 +24,5 @@
 */
 
 use channels;
+set `wait-for-completion-procedure` "true";
 execute deleteSome(108,"jacob");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp
index 8610395..8d55ccd 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp
@@ -24,4 +24,5 @@
 */
 
 use channels;
+set `wait-for-completion-procedure` "true";
 execute addMe();
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp
index 8610395..8d55ccd 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp
@@ -24,4 +24,5 @@
 */
 
 use channels;
+set `wait-for-completion-procedure` "true";
 execute addMe();
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp
index 8610395..8d55ccd 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp
@@ -24,4 +24,5 @@
 */
 
 use channels;
+set `wait-for-completion-procedure` "true";
 execute addMe();