[ASTERIXDB-2314][HYR] Dataset in class names in Hyracks
Change-Id: I333fa410df5efe7da9d4f0e9b7143f9f6928b88b
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index c48ec54..5ec4852 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -51,11 +51,11 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultSetId;
/**
* Provides functionality for channel jobs
@@ -122,7 +122,7 @@
}
public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
- IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
+ IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
@@ -138,7 +138,7 @@
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
- ResultReader resultReader = new ResultReader(hdc, jobId, new ResultSetId(0));
+ ResultReader resultReader = new ResultReader(resultSet, jobId, new ResultSetId(0));
ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
@@ -235,7 +235,7 @@
//Procedures
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
- IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ IResultSet hdc = requestParameters.getResultSet();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
@@ -272,12 +272,12 @@
}
private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, IResultSet resultSet,
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
- procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
- null, null);
+ procedureStatement, hcc, resultSet, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true,
+ null, null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index cff1eaa..f6f17f3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -56,9 +56,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultSetId;
public class ChannelSubscribeStatement extends ExtensionStatement {
@@ -184,7 +184,7 @@
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
- final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ final IResultSet resultSet = requestParameters.getResultSet();
final Stats stats = requestParameters.getStats();
if (subscriptionId == null) {
//To create a new subscription
@@ -207,14 +207,14 @@
InsertStatement insert = new InsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
- resultDelivery, null, stats, false, null, null, null);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc,
+ resultSet, resultDelivery, null, stats, false, null, null, null);
} else {
//To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
- resultDelivery, null, stats, false, null, null, null);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc,
+ resultSet, resultDelivery, null, stats, false, null, null, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index edadeb6..0ddb1c3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -72,9 +72,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateChannelStatement extends ExtensionStatement {
@@ -215,7 +215,7 @@
}
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
+ IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
if (!push) {
@@ -253,7 +253,7 @@
(Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
- hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
+ hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}
@Override
@@ -306,13 +306,14 @@
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
- final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final IResultSet resultSet = requestContext.getResultSet();
final Stats stats = requestContext.getStats();
//Create Channel Datasets
createDatasets(statementExecutor, tempMdProvider, hcc);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
- JobSpecification channeljobSpec = createChannelJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
+ JobSpecification channeljobSpec =
+ createChannelJob(statementExecutor, tempMdProvider, hcc, resultSet, stats);
// Now we subscribe
if (listener == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 6459b4c..ce8f1d2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -73,10 +73,10 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateProcedureStatement extends ExtensionStatement {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 69f413e..2c9d361 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -119,7 +119,7 @@
Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
- BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(),
+ BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getResultSet(),
contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index f358986..d2a6613 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -56,7 +56,7 @@
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class BADGlobalRecoveryManager extends GlobalRecoveryManager {
@@ -146,7 +146,7 @@
activeEventHandler.registerListener(listener);
BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc,
new RequestParameters(
- new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
+ new ResultSet(hcc, appCtx.getCompilerProperties().getFrameSize(),
ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null, null, true),