Coordinated change for https://asterix-gerrit.ics.uci.edu/#/c/2344/
Change-Id: I973c67448d4b34c4521d0abd23c999397e88cf67
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 291697d..20e97b2 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -282,11 +282,6 @@
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
- <artifactId>asterix-lang-common</artifactId>
- <version>${asterix.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
<artifactId>asterix-metadata</artifactId>
<version>${asterix.version}</version>
</dependency>
@@ -296,11 +291,6 @@
<version>${asterix.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-transactions</artifactId>
- <version>${asterix.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
<version>${hyracks.version}</version>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index feaa3ca..53baf6d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -40,6 +40,7 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -256,11 +257,11 @@
}
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
- DeployedJobSpecEventListener listener) throws Exception {
+ DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception {
if (channeljobSpec != null) {
DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
- ChannelJobService.findPeriod(duration), new HashMap<>(), entityId);
+ ChannelJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory);
listener.storeDistributedInfo(destributedId, ses, null, null);
}
@@ -331,7 +332,7 @@
activeEventHandler.registerListener(listener);
}
- setupExecutorJob(entityId, channeljobSpec, hcc, listener);
+ setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory());
channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
duration, null);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 7ab7f95..7db935a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,6 +18,12 @@
*/
package org.apache.asterix.bad.lang.statement;
+import java.io.DataOutput;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.asterix.active.DeployedJobService;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
@@ -45,7 +51,6 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.asterix.translator.ConstantHelper;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
@@ -57,12 +62,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
public class ExecuteProcedureStatement implements IExtensionStatement {
public static final String WAIT_FOR_COMPLETION = "wait-for-completion-procedure";
@@ -133,7 +132,7 @@
if (procedure.getDuration().equals("")) {
//Add the Asterix Transaction Id to the map
- long newTxId = TxnIdFactory.create().getId();
+ long newTxId = metadataProvider.getTxnIdFactory().create().getId();
contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
String.valueOf(newTxId).getBytes());
jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
@@ -153,9 +152,9 @@
}
} else {
- ScheduledExecutorService ses =
- DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
- ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId);
+ ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+ ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
+ metadataProvider.getTxnIdFactory());
listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
listener.getResultId());
}