[ASTERIXDB-2314][HYR] Dataset in class names in Hyracks
- user model changes: no
- storage format changes: no
- interface changes: yes
Change-Id: I260add3a2dfb4a722440af5008a1a75f9446579e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2478
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 0dbd3aa..58f0997 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -22,14 +22,14 @@
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
public interface IRequestParameters {
/**
- * @return A Hyracks dataset client object that is used to read the results.
+ * @return A Resultset client object that is used to read the results.
*/
- IHyracksDataset getHyracksDataset();
+ IResultSet getResultSet();
/**
* Gets the required result properties of the request.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index fa967f5..db9b743 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -36,7 +36,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index c5420ba..9844900 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -19,19 +19,16 @@
package org.apache.asterix.api.http.server;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.io.PrintWriter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.logging.log4j.Level;
@@ -105,31 +102,16 @@
this.appCtx = appCtx;
}
- protected IHyracksDataset getHyracksDataset() throws Exception { // NOSONAR
+ protected IResultSet getResultSet() throws Exception { // NOSONAR
try {
- return doGetHyracksDataset();
+ return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx);
} catch (IPCException e) {
LOGGER.log(Level.WARN, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e);
ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc());
- return doGetHyracksDataset();
+ return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx);
}
}
- protected IHyracksDataset doGetHyracksDataset() throws Exception {
- IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- synchronized (ctx) {
- hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- hds = new HyracksDataset(getHyracksClientConnection(),
- appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS);
- ctx.put(HYRACKS_DATASET_ATTR, hds);
- }
- }
- }
- return hds;
- }
-
protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
if (hcc == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 466757e..b2c7c14 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -19,7 +19,6 @@
package org.apache.asterix.api.http.server;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.awt.image.BufferedImage;
import java.io.BufferedReader;
@@ -33,7 +32,6 @@
import javax.imageio.ImageIO;
-import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -54,8 +52,7 @@
import org.apache.asterix.translator.SessionConfig.PlanFormat;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -137,17 +134,7 @@
}
try {
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- synchronized (ctx) {
- hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- hds = new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
- ResultReader.NUM_READERS);
- ctx.put(HYRACKS_DATASET_ATTR, hds);
- }
- }
- }
+ IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
IParser parser = parserFactory.createParser(query);
List<Statement> aqlStatements = parser.parse();
SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true, planFormat);
@@ -163,7 +150,7 @@
double duration;
long startTime = System.currentTimeMillis();
final IRequestParameters requestParameters =
- new RequestParameters(hds, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
+ new RequestParameters(resultSet, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null, null, true);
translator.compileAndExecute(hcc, null, requestParameters);
long endTime = System.currentTimeMillis();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 79c17c7..77b8647 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -45,7 +45,7 @@
import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServletRequest;
@@ -128,7 +128,7 @@
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
- ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+ ResultReader resultReader = new ResultReader(getResultSet(), rsmd.getLeft(), rsmd.getMiddle());
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
}
} else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index de96d54..6781f22 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -26,8 +26,8 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionOutput;
-import org.apache.hyracks.api.dataset.DatasetJobRecord;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.result.ResultJobRecord;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -59,11 +59,11 @@
return;
}
- IHyracksDataset hds = getHyracksDataset();
- ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId());
+ IResultSet resultSet = getResultSet();
+ ResultReader resultReader = new ResultReader(resultSet, handle.getJobId(), handle.getResultSetId());
try {
- DatasetJobRecord.Status status = resultReader.getStatus();
+ ResultJobRecord.Status status = resultReader.getStatus();
final HttpResponseStatus httpStatus;
if (status == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 0c5d216..54b5baa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -593,8 +593,8 @@
Map<String, IAObject> stmtParams =
org.apache.asterix.app.translator.RequestParameters.deserializeParameterValues(statementParameters);
IRequestParameters requestParameters =
- new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), resultProperties, stats,
- null, param.getClientContextID(), optionalParameters, stmtParams, param.isMultiStatement());
+ new org.apache.asterix.app.translator.RequestParameters(getResultSet(), resultProperties, stats, null,
+ param.getClientContextID(), optionalParameters, stmtParams, param.isMultiStatement());
translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
execution.end();
printExecutionPlans(sessionOutput, translator.getExecutionPlans());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index e55d82a..3773dc5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -28,8 +28,8 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.hyracks.api.dataset.DatasetJobRecord;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultJobRecord;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -55,10 +55,9 @@
return;
}
- IHyracksDataset hds = getHyracksDataset();
- ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId());
+ ResultReader resultReader = new ResultReader(getResultSet(), handle.getJobId(), handle.getResultSetId());
- final DatasetJobRecord.Status resultReaderStatus = resultReader.getStatus();
+ final ResultJobRecord.Status resultReaderStatus = resultReader.getStatus();
if (resultReaderStatus == null) {
LOGGER.log(Level.INFO, "No results for: \"" + strHandle + "\"");
response.setStatus(HttpResponseStatus.NOT_FOUND);
@@ -96,7 +95,7 @@
}
}
- ResultStatus resultStatus(DatasetJobRecord.Status status) {
+ ResultStatus resultStatus(ResultJobRecord.Status status) {
switch (status.getState()) {
case IDLE:
case RUNNING:
@@ -110,7 +109,7 @@
}
}
- Exception extractException(DatasetJobRecord.Status status) {
+ Exception extractException(ResultJobRecord.Status status) {
switch (status.getState()) {
case FAILED:
List<Exception> exceptions = status.getExceptions();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 3c58bc6..99c7308 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -19,13 +19,11 @@
package org.apache.asterix.api.http.server;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
-import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.config.GlobalConfig;
@@ -48,8 +46,7 @@
import org.apache.asterix.translator.SessionConfig.PlanFormat;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -192,24 +189,14 @@
try {
response.setStatus(HttpResponseStatus.OK);
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- synchronized (ctx) {
- hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
- hds = new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
- ResultReader.NUM_READERS);
- ctx.put(HYRACKS_DATASET_ATTR, hds);
- }
- }
- }
IParser parser = parserFactory.createParser(query);
List<Statement> aqlStatements = parser.parse();
validate(aqlStatements);
MetadataManager.INSTANCE.init();
IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
- final IRequestParameters requestParameters = new RequestParameters(hds,
+ final IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
+ final IRequestParameters requestParameters = new RequestParameters(resultSet,
new ResultProperties(resultDelivery), new IStatementExecutor.Stats(), null, null, null, null, true);
translator.compileAndExecute(hcc, null, requestParameters);
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
index 2fe37c3..f62d0c4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -20,7 +20,7 @@
public class ServletConstants {
public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
- public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
+ public static final String RESULTSET_ATTR = "org.apache.asterix.RESULTSET";
public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO";
public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE";
public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
new file mode 100644
index 0000000..17e4c16
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.server.ServletConstants.RESULTSET_ATTR;
+
+import java.util.Map;
+
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.client.result.ResultSet;
+
+public class ServletUtil {
+ static IResultSet getResultSet(IHyracksClientConnection hcc, IApplicationContext appCtx,
+ final Map<String, Object> ctx) throws Exception {
+ IResultSet resultSet = (IResultSet) ctx.get(RESULTSET_ATTR);
+ if (resultSet == null) {
+ synchronized (ctx) {
+ resultSet = (IResultSet) ctx.get(RESULTSET_ATTR);
+ if (resultSet == null) {
+ resultSet =
+ new ResultSet(hcc, appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS);
+ ctx.put(RESULTSET_ATTR, resultSet);
+ }
+ }
+ }
+ return resultSet;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
index bbb9b99..a17fa90 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.app.result;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.job.JobId;
public class ResultHandle {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index eeb01ba..908e3ca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -20,24 +20,24 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
public class ResultReader {
- private IHyracksDatasetReader reader;
+ private IResultSetReader reader;
private IFrameTupleAccessor frameTupleAccessor;
// Number of parallel result reader buffers
public static final int NUM_READERS = 1;
- public ResultReader(IHyracksDataset hdc, JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
- reader = hdc.createReader(jobId, resultSetId);
+ public ResultReader(IResultSet resultSet, JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
+ reader = resultSet.createReader(jobId, resultSetId);
frameTupleAccessor = new ResultFrameTupleAccessor();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 5b4c198..02941d1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -186,8 +186,6 @@
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
@@ -196,6 +194,8 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
@@ -281,7 +281,7 @@
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
Map<String, String> config = new HashMap<>();
- final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ final IResultSet resultSet = requestParameters.getResultSet();
final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
final long maxResultReads = requestParameters.getResultProperties().getMaxReads();
final Stats stats = requestParameters.getStats();
@@ -355,7 +355,7 @@
|| resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
}
- handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata,
+ handleInsertUpsertStatement(metadataProvider, stmt, hcc, resultSet, resultDelivery, outMetadata,
stats, false, clientContextId, stmtParams, stmtRewriter);
break;
case DELETE:
@@ -390,7 +390,7 @@
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
- handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats,
+ handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats,
clientContextId, ctx, stmtParams, stmtRewriter);
break;
case COMPACT:
@@ -1867,7 +1867,7 @@
}
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+ IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId,
Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
@@ -1912,7 +1912,7 @@
}
if (stmtInsertUpsert.getReturnExpression() != null) {
- deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
+ deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
clientContextId, NoOpStatementExecutorContext.INSTANCE);
} else {
locker.lock();
@@ -2465,7 +2465,7 @@
}
protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
- IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
+ IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
String clientContextId, IStatementExecutorContext ctx, Map<String, IAObject> stmtParams,
IStatementRewriter stmtRewriter) throws Exception {
final IMetadataLocker locker = new IMetadataLocker() {
@@ -2499,11 +2499,11 @@
throw e;
}
};
- deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, clientContextId,
- ctx);
+ deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
+ clientContextId, ctx);
}
- private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler,
+ private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
throws Exception {
@@ -2521,7 +2521,7 @@
break;
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
- final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
+ final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
updateJobStats(id, stats);
// stop buffering and allow for streaming result delivery
sessionOutput.release();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index ad12125..5f76568 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -33,15 +33,15 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.ResultProperties;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import com.fasterxml.jackson.databind.JsonNode;
public class RequestParameters implements IRequestParameters {
- private final IHyracksDataset hdc;
+ private final IResultSet resultSet;
private final ResultProperties resultProperties;
private final Stats stats;
private final Map<String, String> optionalParameters;
@@ -50,10 +50,10 @@
private final Map<String, IAObject> statementParameters;
private final boolean multiStatement;
- public RequestParameters(IHyracksDataset hdc, ResultProperties resultProperties, Stats stats,
+ public RequestParameters(IResultSet resultSet, ResultProperties resultProperties, Stats stats,
IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement) {
- this.hdc = hdc;
+ this.resultSet = resultSet;
this.resultProperties = resultProperties;
this.stats = stats;
this.outMetadata = outMetadata;
@@ -64,8 +64,8 @@
}
@Override
- public IHyracksDataset getHyracksDataset() {
- return hdc;
+ public IResultSet getResultSet() {
+ return resultSet;
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f3f5c56..695dcb8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -122,7 +122,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.primitive.ShortPointable;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ResultSetSinkId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ResultSetSinkId.java
index 51feb56..2fb7344 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ResultSetSinkId.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ResultSetSinkId.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.metadata.declared;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
public class ResultSetSinkId {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index ad2e77a..72bdc3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -25,12 +25,12 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
public class HyracksClientInterfaceFunctions {
public enum FunctionId {
@@ -43,9 +43,9 @@
UNDEPLOY_JOB,
REDEPLOY_JOB,
CANCEL_JOB,
- GET_DATASET_DIRECTORY_SERIVICE_INFO,
- GET_DATASET_RESULT_STATUS,
- GET_DATASET_RESULT_LOCATIONS,
+ GET_RESULT_DIRECTORY_ADDRESS,
+ GET_RESULT_STATUS,
+ GET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
CLI_DEPLOY_BINARY,
@@ -250,30 +250,30 @@
}
}
- public static class GetDatasetDirectoryServiceInfoFunction extends Function {
+ public static class GetResultDirectoryAddressFunction extends Function {
private static final long serialVersionUID = 1L;
@Override
public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
+ return FunctionId.GET_RESULT_DIRECTORY_ADDRESS;
}
}
- public static class GetDatasetResultStatusFunction extends Function {
+ public static class GetResultStatusFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final ResultSetId rsId;
- public GetDatasetResultStatusFunction(JobId jobId, ResultSetId rsId) {
+ public GetResultStatusFunction(JobId jobId, ResultSetId rsId) {
this.jobId = jobId;
this.rsId = rsId;
}
@Override
public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_STATUS;
+ return FunctionId.GET_RESULT_STATUS;
}
public JobId getJobId() {
@@ -285,16 +285,16 @@
}
}
- public static class GetDatasetResultLocationsFunction extends Function {
+ public static class GetResultLocationsFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final ResultSetId rsId;
- private final DatasetDirectoryRecord[] knownRecords;
+ private final ResultDirectoryRecord[] knownRecords;
- public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
+ public GetResultLocationsFunction(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords) {
this.jobId = jobId;
this.rsId = rsId;
this.knownRecords = knownRecords;
@@ -302,7 +302,7 @@
@Override
public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_LOCATIONS;
+ return FunctionId.GET_RESULT_LOCATIONS;
}
public JobId getJobId() {
@@ -313,7 +313,7 @@
return rsId;
}
- public DatasetDirectoryRecord[] getKnownRecords() {
+ public ResultDirectoryRecord[] getKnownRecords() {
return knownRecords;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 1ee9bd8..63a32f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -112,9 +112,9 @@
}
@Override
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf =
- new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+ public NetworkAddress getResultDirectoryAddress() throws Exception {
+ HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction gddsf =
+ new HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction();
return (NetworkAddress) rpci.call(ipcHandle, gddsf);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index eaec3c3..48c656f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -171,8 +171,8 @@
}
@Override
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- return hci.getDatasetDirectoryServiceInfo();
+ public NetworkAddress getResultDirectoryAddress() throws Exception {
+ return hci.getResultDirectoryAddress();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index b3b7677..89f2ad4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -141,12 +141,12 @@
JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
/**
- * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
+ * Gets the IP Address and port for the ResultDirectoryService wrapped in NetworkAddress
*
* @return {@link NetworkAddress}
* @throws Exception
*/
- NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+ NetworkAddress getResultDirectoryAddress() throws Exception;
/**
* Waits until the specified job has completed, either successfully or has
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 6a75806..4cc47d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -49,7 +49,7 @@
public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+ public NetworkAddress getResultDirectoryAddress() throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index 57f389f..aab6b2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -31,18 +31,18 @@
private final NetworkAddress netAddress;
- private final NetworkAddress datasetNetworkAddress;
+ private final NetworkAddress resultNetworkAddress;
private final NetworkAddress messagingNetworkAddress;
private final int numAvailableCores;
public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
- NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
+ NetworkAddress resultNetworkAddress, NetworkAddress messagingNetworkAddress, int numAvailableCores) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
- this.datasetNetworkAddress = datasetNetworkAddress;
+ this.resultNetworkAddress = resultNetworkAddress;
this.messagingNetworkAddress = messagingNetworkAddress;
this.numAvailableCores = numAvailableCores;
}
@@ -59,8 +59,8 @@
return netAddress;
}
- public NetworkAddress getDatasetNetworkAddress() {
- return datasetNetworkAddress;
+ public NetworkAddress getResultNetworkAddress() {
+ return resultNetworkAddress;
}
public NetworkAddress getMessagingNetworkAddress() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index bf42d0c..09b6506 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -23,7 +23,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
@@ -43,7 +43,7 @@
ExecutorService getExecutorService();
- IDatasetPartitionManager getDatasetPartitionManager();
+ IResultPartitionManager getResultPartitionManager();
void sendApplicationMessageToCC(Serializable message, DeploymentId deploymentId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
deleted file mode 100644
index 56c0af9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.api.dataset;
-
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.job.JobId;
-
-public interface IHyracksDatasetDirectoryServiceInterface {
- /**
- * Gets the result status for the given result set.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @return {@link Status}
- * @throws Exception
- */
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
-
- /**
- * Gets the IP Addresses and ports for the partition generating the result for each location.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @param knownRecords
- * Locations from the dataset directory that are already known to the client
- * @return {@link NetworkAddress[]}
- * @throws Exception
- */
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index c4c7320..b0a8017 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -40,7 +40,7 @@
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
index 8a3e15a..8343fe0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
@@ -20,7 +20,7 @@
import java.io.Serializable;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.job.JobId;
public final class ResultSetPartitionId implements Serializable {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
similarity index 75%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
index 200f5e5..41b9d1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.job.JobId;
-public interface IHyracksDatasetDirectoryServiceConnection {
+public interface IResultDirectory {
/**
* Gets the result status for the given result set.
*
@@ -32,7 +32,7 @@
* @return {@link Status}
* @throws Exception
*/
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+ Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
/**
* Gets the IP Addresses and ports for the partition generating the result for each location.
@@ -43,9 +43,9 @@
* ID of the result set
* @param knownRecords
* Locations that are already known to the client
- * @return {@link NetworkAddress[]}
+ * @return {@link ResultDirectoryRecord[]}
* @throws Exception
*/
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception;
+ ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords)
+ throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
similarity index 84%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
index a0c1f78..d45cf44 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import java.util.Set;
import org.apache.hyracks.api.job.JobId;
-public interface IDatasetManager {
+public interface IResultManager {
Set<JobId> getJobIds();
- IDatasetStateRecord getState(JobId jobId);
+ IResultStateRecord getState(JobId jobId);
void sweep(JobId jobId);
/**
* Removes all references and deletes persisted files for
- * all expired datasets.
+ * all expired results.
*/
- void sweepExpiredDatasets();
+ void sweepExpiredResultSets();
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
similarity index 82%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
index b1e203f..a539d37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -16,15 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
-public interface IDatasetPartitionManager extends IDatasetManager {
- IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+public interface IResultPartitionManager extends IResultManager {
+ IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException;
void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
@@ -32,7 +32,7 @@
void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
- void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+ void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
throws HyracksException;
void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDataset.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
similarity index 82%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDataset.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
index 0fa5c75..ff2e48a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDataset.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-public interface IHyracksDataset {
- public IHyracksDatasetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException;
+public interface IResultSet {
+ IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
similarity index 78%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetReader.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
index 858faaa..0884c53 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public interface IHyracksDatasetReader {
- public Status getResultStatus();
+public interface IResultSetReader {
+ Status getResultStatus();
- public int read(IFrame frame) throws HyracksDataException;
+ int read(IFrame frame) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
similarity index 87%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
index d18e6cf..fe6bc15 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
@@ -16,8 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
-public interface IDatasetStateRecord {
- public long getTimestamp();
+public interface IResultStateRecord {
+ long getTimestamp();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
similarity index 87%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
index e47b1e2..71792be 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import java.io.Serializable;
import org.apache.hyracks.api.comm.NetworkAddress;
-public class DatasetDirectoryRecord implements Serializable {
+public class ResultDirectoryRecord implements Serializable {
public enum Status {
IDLE,
RUNNING,
@@ -40,7 +40,7 @@
private boolean empty;
- public DatasetDirectoryRecord() {
+ public ResultDirectoryRecord() {
this.address = null;
this.readEOS = false;
this.status = Status.IDLE;
@@ -82,7 +82,7 @@
status = Status.FAILED;
}
- private void updateStatus(final DatasetDirectoryRecord.Status newStatus) {
+ private void updateStatus(final ResultDirectoryRecord.Status newStatus) {
// FAILED is a stable status
if (status != Status.FAILED) {
status = newStatus;
@@ -98,10 +98,10 @@
if (o == this) {
return true;
}
- if (!(o instanceof DatasetDirectoryRecord)) {
+ if (!(o instanceof ResultDirectoryRecord)) {
return false;
}
- return address.equals(((DatasetDirectoryRecord) o).address);
+ return address.equals(((ResultDirectoryRecord) o).address);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
similarity index 85%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 4e7ddda..b8ddbd2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import java.io.Serializable;
import java.util.ArrayList;
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class DatasetJobRecord implements IDatasetStateRecord {
+public class ResultJobRecord implements IResultStateRecord {
public enum State {
IDLE,
RUNNING,
@@ -83,7 +83,7 @@
private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
- public DatasetJobRecord() {
+ public ResultJobRecord() {
this.timestamp = System.nanoTime();
this.status = new Status();
}
@@ -145,17 +145,17 @@
return resultSetMetadataMap.get(rsId);
}
- public synchronized DatasetDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
- DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
+ ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
if (records[partition] == null) {
- records[partition] = new DatasetDirectoryRecord();
+ records[partition] = new ResultDirectoryRecord();
}
return records[partition];
}
- public synchronized DatasetDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition)
+ public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition)
throws HyracksDataException {
- DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
if (records[partition] == null) {
throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, partition, rsId);
}
@@ -164,9 +164,9 @@
public synchronized void updateState(ResultSetId rsId) {
int successCount = 0;
- DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
- for (DatasetDirectoryRecord record : records) {
- if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
+ ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ for (ResultDirectoryRecord record : records) {
+ if ((record != null) && (record.getStatus() == ResultDirectoryRecord.Status.SUCCESS)) {
successCount++;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
similarity index 97%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetId.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
index 1e21e19..ffd4ced 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import java.io.Serializable;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
similarity index 86%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
index 8e9e3dc..b7b8f1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
@@ -16,16 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataset;
+package org.apache.hyracks.api.result;
import java.util.Arrays;
public class ResultSetMetaData {
- private final DatasetDirectoryRecord[] records;
+ private final ResultDirectoryRecord[] records;
private final boolean ordered;
ResultSetMetaData(int len, boolean ordered) {
- this.records = new DatasetDirectoryRecord[len];
+ this.records = new ResultDirectoryRecord[len];
this.ordered = ordered;
}
@@ -33,7 +33,7 @@
return ordered;
}
- public DatasetDirectoryRecord[] getRecords() {
+ public ResultDirectoryRecord[] getRecords() {
return records;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
deleted file mode 100644
index 7f549ca..0000000
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.client.net.ClientNetworkManager;
-import org.apache.hyracks.control.nc.resources.memory.FrameManager;
-
-public class HyracksDataset implements IHyracksDataset {
- private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
-
- private final ClientNetworkManager netManager;
-
- private final IHyracksCommonContext datasetClientCtx;
-
- public HyracksDataset(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception {
- NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
- datasetDirectoryServiceConnection =
- new HyracksDatasetDirectoryServiceConnection(ddsAddress.getAddress(), ddsAddress.getPort());
-
- netManager = new ClientNetworkManager(nReaders);
- netManager.start();
-
- datasetClientCtx = new DatasetClientContext(frameSize);
- }
-
- @Override
- public IHyracksDatasetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
- IHyracksDatasetReader reader = null;
- try {
- reader = new HyracksDatasetReader(datasetDirectoryServiceConnection, netManager, datasetClientCtx, jobId,
- resultSetId);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- return reader;
- }
-
- static class DatasetClientContext extends FrameManager implements IHyracksCommonContext {
-
- DatasetClientContext(int frameSize) {
- super(frameSize);
- }
-
- @Override
- public IIOManager getIoManager() {
- return null;
- }
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
deleted file mode 100644
index 63139d9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.RPCInterface;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-
-//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
-public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection {
- private final IPCSystem ipc;
- private final IHyracksDatasetDirectoryServiceInterface ddsi;
-
- public HyracksDatasetDirectoryServiceConnection(String ddsHost, int ddsPort) throws Exception {
- RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
- ipc.start();
- IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(ddsHost, ddsPort));
- this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
- }
-
- @Override
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
- return ddsi.getDatasetResultStatus(jobId, rsId);
- }
-
- @Override
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception {
- return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
deleted file mode 100644
index 7eeb913..0000000
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.RPCInterface;
-
-//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
-public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface {
- private final IIPCHandle ipcHandle;
-
- private final RPCInterface rpci;
-
- public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
- this.ipcHandle = ipcHandle;
- this.rpci = rpci;
- }
-
- @Override
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
- new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId);
- return (Status) rpci.call(ipcHandle, gdrlf);
- }
-
- @Override
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
- new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId, knownRecords);
- return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
new file mode 100644
index 0000000..6f8c4d0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
+public class ResultDirectory implements IResultDirectory {
+ private final IPCSystem ipc;
+ private final IResultDirectory remoteResultDirectory;
+
+ public ResultDirectory(String resultHost, int resultPort) throws Exception {
+ RPCInterface rpci = new RPCInterface();
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+ ipc.start();
+ IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(resultHost, resultPort));
+ this.remoteResultDirectory = new ResultDirectoryRemoteProxy(ddsIpchandle, rpci);
+ }
+
+ @Override
+ public Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
+ return remoteResultDirectory.getResultStatus(jobId, rsId);
+ }
+
+ @Override
+ public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId,
+ ResultDirectoryRecord[] knownRecords) throws Exception {
+ return remoteResultDirectory.getResultLocations(jobId, rsId, knownRecords);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
new file mode 100644
index 0000000..77c6e4b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.RPCInterface;
+
+//TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
+public class ResultDirectoryRemoteProxy implements IResultDirectory {
+ private final IIPCHandle ipcHandle;
+
+ private final RPCInterface rpci;
+
+ public ResultDirectoryRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
+ this.ipcHandle = ipcHandle;
+ this.rpci = rpci;
+ }
+
+ @Override
+ public Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
+ HyracksClientInterfaceFunctions.GetResultStatusFunction gdrlf =
+ new HyracksClientInterfaceFunctions.GetResultStatusFunction(jobId, rsId);
+ return (Status) rpci.call(ipcHandle, gdrlf);
+ }
+
+ @Override
+ public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId,
+ ResultDirectoryRecord[] knownRecords) throws Exception {
+ HyracksClientInterfaceFunctions.GetResultLocationsFunction gdrlf =
+ new HyracksClientInterfaceFunctions.GetResultLocationsFunction(jobId, rsId, knownRecords);
+ return (ResultDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
new file mode 100644
index 0000000..ef93cce
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.net.ClientNetworkManager;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+
+public class ResultSet implements IResultSet {
+ private final IResultDirectory resultDirectory;
+
+ private final ClientNetworkManager netManager;
+
+ private final IHyracksCommonContext resultClientCtx;
+
+ public ResultSet(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception {
+ NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
+ resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort());
+
+ netManager = new ClientNetworkManager(nReaders);
+ netManager.start();
+
+ resultClientCtx = new ResultClientContext(frameSize);
+ }
+
+ @Override
+ public IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
+ IResultSetReader reader = null;
+ try {
+ reader = new ResultSetReader(resultDirectory, netManager, resultClientCtx, jobId, resultSetId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ return reader;
+ }
+
+ static class ResultClientContext extends FrameManager implements IHyracksCommonContext {
+
+ ResultClientContext(int frameSize) {
+ super(frameSize);
+ }
+
+ @Override
+ public IIOManager getIoManager() {
+ return null;
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
similarity index 75%
rename from hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
rename to hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index b1566f4..092d860 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.client.dataset;
+package org.apache.hyracks.client.result;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -30,43 +30,42 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.client.net.ClientNetworkManager;
-import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel;
+import org.apache.hyracks.comm.channels.ResultNetworkInputChannel;
import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@NotThreadSafe
-public class HyracksDatasetReader implements IHyracksDatasetReader {
+public class ResultSetReader implements IResultSetReader {
private static final Logger LOGGER = LogManager.getLogger();
private static final int NUM_READ_BUFFERS = 1;
- private final IHyracksDatasetDirectoryServiceConnection datasetDirectory;
+ private final IResultDirectory resultDirectory;
private final ClientNetworkManager netManager;
- private final IHyracksCommonContext datasetClientCtx;
+ private final IHyracksCommonContext resultClientCtx;
private final JobId jobId;
private final ResultSetId resultSetId;
- private DatasetDirectoryRecord[] knownRecords;
- private DatasetInputChannelMonitor[] monitors;
- private DatasetInputChannelMonitor currentRecordMonitor;
- private DatasetNetworkInputChannel currentRecordChannel;
+ private ResultDirectoryRecord[] knownRecords;
+ private ResultInputChannelMonitor[] monitors;
+ private ResultInputChannelMonitor currentRecordMonitor;
+ private ResultNetworkInputChannel currentRecordChannel;
private int currentRecord;
- public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectory,
- ClientNetworkManager netManager, IHyracksCommonContext datasetClientCtx, JobId jobId,
- ResultSetId resultSetId) {
- this.datasetDirectory = datasetDirectory;
+ public ResultSetReader(IResultDirectory resultDirectory, ClientNetworkManager netManager,
+ IHyracksCommonContext resultClientCtx, JobId jobId, ResultSetId resultSetId) {
+ this.resultDirectory = resultDirectory;
this.netManager = netManager;
- this.datasetClientCtx = datasetClientCtx;
+ this.resultClientCtx = resultClientCtx;
this.jobId = jobId;
this.resultSetId = resultSetId;
currentRecord = -1;
@@ -75,7 +74,7 @@
@Override
public Status getResultStatus() {
try {
- return datasetDirectory.getDatasetResultStatus(jobId, resultSetId);
+ return resultDirectory.getResultStatus(jobId, resultSetId);
} catch (HyracksDataException e) {
if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
@@ -93,7 +92,7 @@
if (isFirstRead() && !hasNextRecord()) {
return readSize;
}
- // read until frame is full or all dataset records have been read
+ // read until frame is full or all result records have been read
while (readSize < frame.getFrameSize()) {
if (currentRecordMonitor.hasMoreFrames()) {
final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
@@ -123,7 +122,7 @@
return readSize;
}
- private SocketAddress getSocketAddress(DatasetDirectoryRecord record) throws HyracksDataException {
+ private SocketAddress getSocketAddress(ResultDirectoryRecord record) throws HyracksDataException {
try {
final NetworkAddress netAddr = record.getNetworkAddress();
return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
@@ -132,22 +131,22 @@
}
}
- private DatasetInputChannelMonitor getMonitor(int partition) {
+ private ResultInputChannelMonitor getMonitor(int partition) {
if (knownRecords == null || knownRecords[partition] == null) {
throw new IllegalStateException("Accessing monitors before obtaining the corresponding addresses");
}
if (monitors == null) {
- monitors = new DatasetInputChannelMonitor[knownRecords.length];
+ monitors = new ResultInputChannelMonitor[knownRecords.length];
}
if (monitors[partition] == null) {
- monitors[partition] = new DatasetInputChannelMonitor();
+ monitors[partition] = new ResultInputChannelMonitor();
}
return monitors[partition];
}
private boolean hasNextRecord() throws HyracksDataException {
currentRecord++;
- DatasetDirectoryRecord record = getRecord(currentRecord);
+ ResultDirectoryRecord record = getRecord(currentRecord);
// skip empty records
while (record.isEmpty() && ++currentRecord < knownRecords.length) {
record = getRecord(currentRecord);
@@ -160,10 +159,10 @@
return true;
}
- private DatasetDirectoryRecord getRecord(int recordNum) throws HyracksDataException {
+ private ResultDirectoryRecord getRecord(int recordNum) throws HyracksDataException {
try {
while (knownRecords == null || knownRecords[recordNum] == null) {
- knownRecords = datasetDirectory.getDatasetResultLocations(jobId, resultSetId, knownRecords);
+ knownRecords = resultDirectory.getResultLocations(jobId, resultSetId, knownRecords);
}
return knownRecords[recordNum];
} catch (Exception e) {
@@ -171,12 +170,12 @@
}
}
- private void requestRecordData(DatasetDirectoryRecord record) throws HyracksDataException {
- currentRecordChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
+ private void requestRecordData(ResultDirectoryRecord record) throws HyracksDataException {
+ currentRecordChannel = new ResultNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
currentRecord, NUM_READ_BUFFERS);
currentRecordMonitor = getMonitor(currentRecord);
currentRecordChannel.registerMonitor(currentRecordMonitor);
- currentRecordChannel.open(datasetClientCtx);
+ currentRecordChannel.open(resultClientCtx);
}
private boolean isFirstRead() {
@@ -187,13 +186,13 @@
return knownRecords != null && currentRecord == knownRecords.length - 1;
}
- private static class DatasetInputChannelMonitor implements IInputChannelMonitor {
+ private static class ResultInputChannelMonitor implements IInputChannelMonitor {
private int availableFrames;
private boolean eos;
private boolean failed;
- DatasetInputChannelMonitor() {
+ ResultInputChannelMonitor() {
eos = false;
failed = false;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
similarity index 91%
rename from hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
rename to hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 0f96a6e..37540cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -29,13 +29,13 @@
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetNetworkInputChannel implements IInputChannel {
+public class ResultNetworkInputChannel implements IInputChannel {
private static final Logger LOGGER = LogManager.getLogger();
static final int INITIAL_MESSAGE_SIZE = 20;
@@ -60,7 +60,7 @@
private Object attachment;
- public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
+ public ResultNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
ResultSetId resultSetId, int partition, int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
@@ -131,17 +131,17 @@
@Override
public void accept(ByteBuffer buffer) {
fullQueue.add(buffer);
- monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
+ monitor.notifyDataAvailability(ResultNetworkInputChannel.this, 1);
}
@Override
public void close() {
- monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
+ monitor.notifyEndOfStream(ResultNetworkInputChannel.this);
}
@Override
public void error(int ecode) {
- monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode);
+ monitor.notifyFailure(ResultNetworkInputChannel.this, ecode);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index a669402..2edbab8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -20,21 +20,21 @@
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.control.cc.work.CancelJobWork;
import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
import org.apache.hyracks.control.cc.work.DeployJobSpecWork;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetJobInfoWork;
import org.apache.hyracks.control.cc.work.GetJobStatusWork;
import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultDirectoryAddressWork;
import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import org.apache.hyracks.control.cc.work.GetResultStatusWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
@@ -121,19 +121,19 @@
ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id));
break;
- case GET_DATASET_DIRECTORY_SERIVICE_INFO:
+ case GET_RESULT_DIRECTORY_ADDRESS:
ccs.getWorkQueue().schedule(
- new GetDatasetDirectoryServiceInfoWork(ccs, new IPCResponder<NetworkAddress>(handle, mid)));
+ new GetResultDirectoryAddressWork(ccs, new IPCResponder<NetworkAddress>(handle, mid)));
break;
- case GET_DATASET_RESULT_STATUS:
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf =
- (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ case GET_RESULT_STATUS:
+ HyracksClientInterfaceFunctions.GetResultStatusFunction gdrsf =
+ (HyracksClientInterfaceFunctions.GetResultStatusFunction) fn;
ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), gdrsf.getResultSetId(),
new IPCResponder<Status>(handle, mid)));
break;
- case GET_DATASET_RESULT_LOCATIONS:
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
- (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ case GET_RESULT_LOCATIONS:
+ HyracksClientInterfaceFunctions.GetResultLocationsFunction gdrlf =
+ (HyracksClientInterfaceFunctions.GetResultLocationsFunction) fn;
ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid)));
break;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 153a32d..8ed015c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -55,10 +55,10 @@
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
-import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobManager;
+import org.apache.hyracks.control.cc.result.IResultDirectoryService;
+import org.apache.hyracks.control.cc.result.ResultDirectoryService;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.apache.hyracks.control.cc.web.WebServer;
@@ -122,7 +122,7 @@
private final DeadNodeSweeper sweeper;
- private final IDatasetDirectoryService datasetDirectoryService;
+ private final IResultDirectoryService resultDirectoryService;
private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
@@ -171,8 +171,8 @@
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
- datasetDirectoryService =
- new DatasetDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold());
+ resultDirectoryService =
+ new ResultDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -221,7 +221,7 @@
jobLog.open();
startApplication();
- datasetDirectoryService.init(executor);
+ resultDirectoryService.init(executor);
workQueue.start();
connectNCs();
LOGGER.log(Level.INFO, "Started ClusterControllerService");
@@ -230,7 +230,7 @@
private void startApplication() throws Exception {
serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
- serviceCtx.addJobLifecycleListener(datasetDirectoryService);
+ serviceCtx.addJobLifecycleListener(resultDirectoryService);
application.init(serviceCtx);
executor = MaintainedThreadNameExecutorService.newCachedThreadPool(serviceCtx.getThreadFactory());
application.start(ccConfig.getAppArgsArray());
@@ -408,7 +408,7 @@
return clusterIPC;
}
- public NetworkAddress getDatasetDirectoryServiceInfo() {
+ public NetworkAddress getResultDirectoryAddress() {
return new NetworkAddress(ccConfig.getClientPublicAddress(), ccConfig.getClientPublicPort());
}
@@ -461,8 +461,8 @@
}
}
- public IDatasetDirectoryService getDatasetDirectoryService() {
- return datasetDirectoryService;
+ public IResultDirectoryService getResultDirectoryService() {
+ return resultDirectoryService;
}
public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 2b03324..31f989b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -144,7 +144,7 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
nodeRegistry.forEach(
(key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
- ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
+ ncState.getResultPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
return result;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
similarity index 77%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index 68d6c16..c8f576c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -16,22 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.cc.dataset;
+package org.apache.hyracks.control.cc.result;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.IResultManager;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.work.IResultCallback;
-public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
+public interface IResultDirectoryService extends IJobLifecycleListener, IResultManager {
public void init(ExecutorService executor);
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
@@ -45,6 +45,6 @@
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
- public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownLocations,
- IResultCallback<DatasetDirectoryRecord[]> callback) throws HyracksDataException;
+ public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownLocations,
+ IResultCallback<ResultDirectoryRecord[]> callback) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
similarity index 73%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 04aaddd..a65ce4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.cc.dataset;
+package org.apache.hyracks.control.cc.result;
import java.io.PrintWriter;
import java.util.Arrays;
@@ -28,33 +28,33 @@
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.State;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.dataset.ResultSetMetaData;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultJobRecord;
+import org.apache.hyracks.api.result.ResultJobRecord.State;
+import org.apache.hyracks.api.result.IResultStateRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetMetaData;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
-import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
+import org.apache.hyracks.control.common.result.AbstractResultManager;
+import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
- * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
+ * TODO(madhusudancs): The potential perils of this global result directory service implementation is that, the jobs
* location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
* grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
* the job (after it receives all the results) completely. Then we can just get rid of the location information for that
* job.
*/
-public class DatasetDirectoryService extends AbstractDatasetManager implements IDatasetDirectoryService {
+public class ResultDirectoryService extends AbstractResultManager implements IResultDirectoryService {
private static final Logger LOGGER = LogManager.getLogger();
@@ -62,7 +62,7 @@
private final Map<JobId, JobResultInfo> jobResultLocations;
- public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+ public ResultDirectoryService(long resultTTL, long resultSweepThreshold) {
super(resultTTL);
this.resultSweepThreshold = resultSweepThreshold;
jobResultLocations = new LinkedHashMap<>();
@@ -81,7 +81,7 @@
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
}
- jobResultLocations.put(jobId, new JobResultInfo(new DatasetJobRecord(), null));
+ jobResultLocations.put(jobId, new JobResultInfo(new ResultJobRecord(), null));
}
@Override
@@ -94,13 +94,13 @@
// Auto-generated method stub
}
- private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
+ private ResultJobRecord getResultJobRecord(JobId jobId) {
final JobResultInfo jri = jobResultLocations.get(jobId);
return jri == null ? null : jri.getRecord();
}
- private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) throws HyracksDataException {
- final DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ private ResultJobRecord getNonNullResultJobRecord(JobId jobId) throws HyracksDataException {
+ final ResultJobRecord djr = getResultJobRecord(jobId);
if (djr == null) {
throw HyracksDataException.create(ErrorCode.NO_RESULT_SET, jobId);
}
@@ -111,9 +111,9 @@
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
throws HyracksDataException {
- DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ ResultJobRecord djr = getNonNullResultJobRecord(jobId);
djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
- DatasetDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
+ ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
record.setNetworkAddress(networkAddress);
record.setEmpty(emptyResult);
@@ -123,7 +123,7 @@
Waiter waiter = jobResultInfo.getWaiter(rsId);
if (waiter != null) {
try {
- DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
+ ResultDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
if (updatedRecords != null) {
jobResultInfo.removeWaiter(rsId);
waiter.callback.setValue(updatedRecords);
@@ -138,7 +138,7 @@
@Override
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
throws HyracksDataException {
- DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ ResultJobRecord djr = getNonNullResultJobRecord(jobId);
djr.getDirectoryRecord(rsId, partition).writeEOS();
djr.updateState(rsId);
notifyAll();
@@ -148,11 +148,11 @@
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(),
exceptions.get(0));
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- LOGGER.log(Level.INFO, "Dataset job record is " + djr);
- if (djr != null) {
- LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record");
- djr.fail(exceptions);
+ ResultJobRecord rjr = getResultJobRecord(jobId);
+ LOGGER.log(Level.INFO, "Result job record is " + rjr);
+ if (rjr != null) {
+ LOGGER.log(Level.INFO, "Setting exceptions in Result job record");
+ rjr.fail(exceptions);
}
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo);
@@ -164,9 +164,9 @@
}
@Override
- public synchronized DatasetJobRecord.Status getResultStatus(JobId jobId, ResultSetId rsId)
+ public synchronized ResultJobRecord.Status getResultStatus(JobId jobId, ResultSetId rsId)
throws HyracksDataException {
- return getNonNullDatasetJobRecord(jobId).getStatus();
+ return getNonNullResultJobRecord(jobId).getStatus();
}
@Override
@@ -175,8 +175,8 @@
}
@Override
- public IDatasetStateRecord getState(JobId jobId) {
- return getDatasetJobRecord(jobId);
+ public IResultStateRecord getState(JobId jobId) {
+ return getResultJobRecord(jobId);
}
@Override
@@ -186,9 +186,9 @@
@Override
public synchronized void getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback)
+ ResultDirectoryRecord[] knownRecords, IResultCallback<ResultDirectoryRecord[]> callback)
throws HyracksDataException {
- DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
+ ResultDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
if (updatedRecords == null) {
jobResultLocations.get(jobId).addWaiter(rsId, knownRecords, callback);
} else {
@@ -198,7 +198,7 @@
/**
* Compares the records already known by the client for the given job's result set id with the records that the
- * dataset directory service knows and if there are any newly discovered records returns a whole array with the
+ * result directory service knows and if there are any newly discovered records returns a whole array with the
* new records filled in.
*
* @param jobId
@@ -210,12 +210,12 @@
* @return
* Returns the updated records if new record were discovered, null otherwise
* @throws HyracksDataException
- * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
- * every check. This already looks very expensive.
+ * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n)
+ * iterations for every check. This already looks very expensive.
*/
- private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
- DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+ private ResultDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords)
+ throws HyracksDataException {
+ ResultJobRecord djr = getNonNullResultJobRecord(jobId);
if (djr.getStatus().getState() == State.FAILED) {
List<Exception> caughtExceptions = djr.getStatus().getExceptions();
@@ -234,7 +234,7 @@
if (resultSetMetaData == null) {
return null;
}
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
return Arrays.equals(records, knownRecords) ? null : records;
}
@@ -243,7 +243,7 @@
for (JobId jId : getJobIds()) {
pw.print(jId.toString());
pw.print(" - ");
- pw.println(String.valueOf(getDatasetJobRecord(jId)));
+ pw.println(String.valueOf(getResultJobRecord(jId)));
}
pw.flush();
return pw;
@@ -252,21 +252,21 @@
class JobResultInfo {
- private DatasetJobRecord record;
+ private ResultJobRecord record;
private Waiters waiters;
private Exception exception;
- JobResultInfo(DatasetJobRecord record, Waiters waiters) {
+ JobResultInfo(ResultJobRecord record, Waiters waiters) {
this.record = record;
this.waiters = waiters;
}
- DatasetJobRecord getRecord() {
+ ResultJobRecord getRecord() {
return record;
}
- void addWaiter(ResultSetId rsId, DatasetDirectoryRecord[] knownRecords,
- IResultCallback<DatasetDirectoryRecord[]> callback) {
+ void addWaiter(ResultSetId rsId, ResultDirectoryRecord[] knownRecords,
+ IResultCallback<ResultDirectoryRecord[]> callback) {
if (waiters == null) {
waiters = new Waiters();
}
@@ -306,10 +306,10 @@
}
class Waiter {
- DatasetDirectoryRecord[] knownRecords;
- IResultCallback<DatasetDirectoryRecord[]> callback;
+ ResultDirectoryRecord[] knownRecords;
+ IResultCallback<ResultDirectoryRecord[]> callback;
- Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+ Waiter(ResultDirectoryRecord[] knownRecords, IResultCallback<ResultDirectoryRecord[]> callback) {
this.knownRecords = knownRecords;
this.callback = callback;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
similarity index 84%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
index fb1febf..bf95ff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
@@ -23,12 +23,12 @@
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
-public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
+public class GetResultDirectoryAddressWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final IResultCallback<NetworkAddress> callback;
- public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
+ public GetResultDirectoryAddressWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
this.ccs = ccs;
this.callback = callback;
}
@@ -36,7 +36,7 @@
@Override
public void doRun() {
try {
- NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
+ NetworkAddress addr = ccs.getResultDirectoryAddress();
callback.setValue(addr);
} catch (Exception e) {
callback.setException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index ce82ec8..68382f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -20,12 +20,12 @@
import java.util.Arrays;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
+import org.apache.hyracks.control.cc.result.IResultDirectoryService;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -36,12 +36,12 @@
private final ResultSetId rsId;
- private final DatasetDirectoryRecord[] knownRecords;
+ private final ResultDirectoryRecord[] knownRecords;
- private final IResultCallback<DatasetDirectoryRecord[]> callback;
+ private final IResultCallback<ResultDirectoryRecord[]> callback;
public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+ ResultDirectoryRecord[] knownRecords, IResultCallback<ResultDirectoryRecord[]> callback) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
@@ -51,7 +51,7 @@
@Override
public void doRun() {
- final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
+ final IResultDirectoryService dds = ccs.getResultDirectoryService();
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultStatusWork.java
index 676276a..560f505 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultStatusWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultStatusWork.java
@@ -18,8 +18,8 @@
*/
package org.apache.hyracks.control.cc.work;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -46,7 +46,7 @@
@Override
public void doRun() {
try {
- Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
+ Status status = ccs.getResultDirectoryService().getResultStatus(jobId, rsId);
callback.setValue(status);
} catch (HyracksDataException e) {
callback.setException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index ad36701..98fe722 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -22,7 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -68,7 +68,7 @@
@Override
public void run() {
try {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+ ccs.getResultDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
partition, nPartitions, networkAddress);
} catch (HyracksDataException e) {
LOGGER.log(Level.WARN, "Failed to register partition location", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index d63bc8a..b36c917 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.control.cc.work;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -44,7 +44,7 @@
@Override
public void run() {
try {
- ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ ccs.getResultDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (HyracksDataException e) {
throw new RuntimeException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index e7af300..8c86aff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -48,7 +48,7 @@
if (run == null) {
return;
}
- ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
+ ccs.getResultDirectoryService().reportJobFailure(jobId, exceptions);
run.getExecutor().notifyTaskFailure(ta, exceptions);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index cc2ed46..43b00f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -172,7 +172,7 @@
NetworkAddress msgAddr = new NetworkAddress(ipAddr, 1003);
when(ncState.getCapacity()).thenReturn(new NodeCapacity(NODE_MEMORY_SIZE, NODE_CORES));
when(ncState.getDataPort()).thenReturn(dataAddr);
- when(ncState.getDatasetPort()).thenReturn(resultAddr);
+ when(ncState.getResultPort()).thenReturn(resultAddr);
when(ncState.getMessagingPort()).thenReturn(msgAddr);
NCConfig ncConfig = new NCConfig(nodeId);
ncConfig.setDataPublicAddress(ipAddr);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
index ddab504..1926da6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -46,7 +46,7 @@
private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
+ private final NetworkAddress resultPort;
private final NetworkAddress messagingPort;
@@ -118,13 +118,13 @@
private final long[] netSignalingBytesWritten;
- private final long[] datasetNetPayloadBytesRead;
+ private final long[] resultNetPayloadBytesRead;
- private final long[] datasetNetPayloadBytesWritten;
+ private final long[] resultNetPayloadBytesWritten;
- private final long[] datasetNetSignalingBytesRead;
+ private final long[] resultNetSignalingBytesRead;
- private final long[] datasetNetSignalingBytesWritten;
+ private final long[] resultNetSignalingBytesWritten;
private final long[] ipcMessagesSent;
@@ -147,7 +147,7 @@
public NodeControllerData(NodeRegistration reg) {
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
- datasetPort = reg.getDatasetPort();
+ resultPort = reg.getResultPort();
messagingPort = reg.getMessagingPort();
activeJobIds = new HashSet<>();
@@ -191,10 +191,10 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
- datasetNetPayloadBytesRead = new long[RRD_SIZE];
- datasetNetPayloadBytesWritten = new long[RRD_SIZE];
- datasetNetSignalingBytesRead = new long[RRD_SIZE];
- datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+ resultNetPayloadBytesRead = new long[RRD_SIZE];
+ resultNetPayloadBytesWritten = new long[RRD_SIZE];
+ resultNetSignalingBytesRead = new long[RRD_SIZE];
+ resultNetSignalingBytesWritten = new long[RRD_SIZE];
ipcMessagesSent = new long[RRD_SIZE];
ipcMessageBytesSent = new long[RRD_SIZE];
ipcMessagesReceived = new long[RRD_SIZE];
@@ -231,10 +231,10 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
- datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
- datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
- datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
- datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+ resultNetPayloadBytesRead[rrdPtr] = hbData.resultNetPayloadBytesRead;
+ resultNetPayloadBytesWritten[rrdPtr] = hbData.resultNetPayloadBytesWritten;
+ resultNetSignalingBytesRead[rrdPtr] = hbData.resultNetSignalingBytesRead;
+ resultNetSignalingBytesWritten[rrdPtr] = hbData.resultNetSignalingBytesWritten;
ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -264,8 +264,8 @@
return dataPort;
}
- public NetworkAddress getDatasetPort() {
- return datasetPort;
+ public NetworkAddress getResultPort() {
+ return resultPort;
}
public NetworkAddress getMessagingPort() {
@@ -332,10 +332,10 @@
put(o, "net-payload-bytes-written", netPayloadBytesWritten);
put(o, "net-signaling-bytes-read", netSignalingBytesRead);
put(o, "net-signaling-bytes-written", netSignalingBytesWritten);
- put(o, "dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
- put(o, "dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
- put(o, "dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
- put(o, "dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
+ put(o, "result-net-payload-bytes-read", resultNetPayloadBytesRead);
+ put(o, "result-net-payload-bytes-written", resultNetPayloadBytesWritten);
+ put(o, "result-net-signaling-bytes-read", resultNetSignalingBytesRead);
+ put(o, "result-net-signaling-bytes-written", resultNetSignalingBytesWritten);
put(o, "ipc-messages-sent", ipcMessagesSent);
put(o, "ipc-message-bytes-sent", ipcMessageBytesSent);
put(o, "ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 6230f1d..fc0154e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -22,10 +22,10 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index aa3260d..95c67df 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -157,13 +157,13 @@
case DATA_PUBLIC_PORT:
return "Public IP port to announce data listener";
case RESULT_LISTEN_ADDRESS:
- return "IP Address to bind dataset result distribution listener";
+ return "IP Address to bind result distribution listener";
case RESULT_LISTEN_PORT:
- return "IP port to bind dataset result distribution listener";
+ return "IP port to bind result distribution listener";
case RESULT_PUBLIC_ADDRESS:
- return "Public IP Address to announce dataset result distribution listener";
+ return "Public IP Address to announce result distribution listener";
case RESULT_PUBLIC_PORT:
- return "Public IP port to announce dataset result distribution listener";
+ return "Public IP port to announce result distribution listener";
case MESSAGING_LISTEN_ADDRESS:
return "IP Address to bind messaging listener";
case MESSAGING_LISTEN_PORT:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index c2f2e1a..b4d835d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -43,7 +43,7 @@
private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
+ private final NetworkAddress resultPort;
private final String osName;
@@ -78,12 +78,12 @@
private final NodeCapacity capacity;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- NetworkAddress datasetPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
+ NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
- this.datasetPort = datasetPort;
+ this.resultPort = resultPort;
this.hbSchema = hbSchema;
this.messagingPort = messagingPort;
this.capacity = capacity;
@@ -122,8 +122,8 @@
return dataPort;
}
- public NetworkAddress getDatasetPort() {
- return datasetPort;
+ public NetworkAddress getResultPort() {
+ return resultPort;
}
public String getOSName() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
index 76b68bb..b8ba173 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -44,10 +44,10 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
- public long datasetNetPayloadBytesRead;
- public long datasetNetPayloadBytesWritten;
- public long datasetNetSignalingBytesRead;
- public long datasetNetSignalingBytesWritten;
+ public long resultNetPayloadBytesRead;
+ public long resultNetPayloadBytesWritten;
+ public long resultNetSignalingBytesRead;
+ public long resultNetSignalingBytesWritten;
public long ipcMessagesSent;
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
@@ -79,9 +79,9 @@
netSignalingBytesRead = dis.readLong();
netSignalingBytesWritten = dis.readLong();
netSignalingBytesWritten = dis.readLong();
- datasetNetPayloadBytesWritten = dis.readLong();
- datasetNetSignalingBytesRead = dis.readLong();
- datasetNetSignalingBytesWritten = dis.readLong();
+ resultNetPayloadBytesWritten = dis.readLong();
+ resultNetSignalingBytesRead = dis.readLong();
+ resultNetSignalingBytesWritten = dis.readLong();
ipcMessagesSent = dis.readLong();
ipcMessageBytesSent = dis.readLong();
ipcMessagesReceived = dis.readLong();
@@ -119,10 +119,10 @@
dos.writeLong(netPayloadBytesWritten);
dos.writeLong(netSignalingBytesRead);
dos.writeLong(netSignalingBytesWritten);
- dos.writeLong(datasetNetPayloadBytesRead);
- dos.writeLong(datasetNetPayloadBytesWritten);
- dos.writeLong(datasetNetSignalingBytesRead);
- dos.writeLong(datasetNetSignalingBytesWritten);
+ dos.writeLong(resultNetPayloadBytesRead);
+ dos.writeLong(resultNetPayloadBytesWritten);
+ dos.writeLong(resultNetSignalingBytesRead);
+ dos.writeLong(resultNetSignalingBytesWritten);
dos.writeLong(ipcMessagesSent);
dos.writeLong(ipcMessageBytesSent);
dos.writeLong(ipcMessagesReceived);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index dea5198..ce4578d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -46,13 +46,13 @@
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index bf35e6b..8e2ec22 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -22,10 +22,10 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
index f95229e..e314a6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
@@ -16,40 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.dataset;
+package org.apache.hyracks.control.common.result;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.result.IResultManager;
+import org.apache.hyracks.api.result.IResultStateRecord;
import org.apache.hyracks.api.job.JobId;
-public abstract class AbstractDatasetManager implements IDatasetManager {
+public abstract class AbstractResultManager implements IResultManager {
private final long nanoResultTTL;
- protected AbstractDatasetManager(long resultTTL) {
+ protected AbstractResultManager(long resultTTL) {
this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL);
}
@Override
- public synchronized void sweepExpiredDatasets() {
- final List<JobId> expiredDatasets = new ArrayList<>();
+ public synchronized void sweepExpiredResultSets() {
+ final List<JobId> expiredResultSets = new ArrayList<>();
final long sweepTime = System.nanoTime();
for (JobId jobId : getJobIds()) {
- final IDatasetStateRecord state = getState(jobId);
+ final IResultStateRecord state = getState(jobId);
if (state != null && hasExpired(state, sweepTime, nanoResultTTL)) {
- expiredDatasets.add(jobId);
+ expiredResultSets.add(jobId);
}
}
- for (JobId jobId : expiredDatasets) {
+ for (JobId jobId : expiredResultSets) {
sweep(jobId);
}
}
- private static boolean hasExpired(IDatasetStateRecord dataset, long currentTime, long ttl) {
- return currentTime - dataset.getTimestamp() - ttl > 0;
+ private static boolean hasExpired(IResultStateRecord state, long currentTime, long ttl) {
+ return currentTime - state.getTimestamp() - ttl > 0;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/ResultStateSweeper.java
similarity index 81%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/ResultStateSweeper.java
index 901ec67..76bbabb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/ResultStateSweeper.java
@@ -17,9 +17,9 @@
* under the License.
*/
-package org.apache.hyracks.control.common.dataset;
+package org.apache.hyracks.control.common.result;
-import org.apache.hyracks.api.dataset.IDatasetManager;
+import org.apache.hyracks.api.result.IResultManager;
import org.apache.logging.log4j.Logger;
/**
@@ -27,12 +27,12 @@
*/
public class ResultStateSweeper implements Runnable {
- private final IDatasetManager datasetManager;
+ private final IResultManager resultManager;
private final long resultSweepThreshold;
private final Logger logger;
- public ResultStateSweeper(IDatasetManager datasetManager, long resultSweepThreshold, Logger logger) {
- this.datasetManager = datasetManager;
+ public ResultStateSweeper(IResultManager resultManager, long resultSweepThreshold, Logger logger) {
+ this.resultManager = resultManager;
this.resultSweepThreshold = resultSweepThreshold;
this.logger = logger;
}
@@ -42,7 +42,7 @@
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(resultSweepThreshold);
- datasetManager.sweepExpiredDatasets();
+ resultManager.sweepExpiredResultSets();
logger.trace("Result state cleanup instance successfully completed.");
} catch (InterruptedException e) {
logger.warn("Result cleaner thread interrupted, shutting down.");
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 804bacd..169e5ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -46,7 +46,6 @@
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -57,6 +56,7 @@
import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.InvokeUtil;
@@ -74,15 +74,15 @@
import org.apache.hyracks.control.common.work.FutureValue;
import org.apache.hyracks.control.common.work.WorkQueue;
import org.apache.hyracks.control.nc.application.NCServiceContext;
-import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.net.ResultNetworkManager;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.result.ResultPartitionManager;
import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
import org.apache.hyracks.ipc.api.IIPCEventListener;
@@ -118,9 +118,9 @@
private NetworkManager netManager;
- private IDatasetPartitionManager datasetPartitionManager;
+ private IResultPartitionManager resultPartitionManager;
- private DatasetNetworkManager datasetNetworkManager;
+ private ResultNetworkManager resultNetworkManager;
private final WorkQueue workQueue;
@@ -262,10 +262,10 @@
}
private void init() {
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
+ resultPartitionManager = new ResultPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
- datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
- ncConfig.getResultListenPort(), datasetPartitionManager, ncConfig.getNetThreadCount(),
+ resultNetworkManager = new ResultNetworkManager(ncConfig.getResultListenAddress(),
+ ncConfig.getResultListenPort(), resultPartitionManager, ncConfig.getNetThreadCount(),
ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
FullFrameChannelInterfaceFactory.INSTANCE);
if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) {
@@ -289,7 +289,7 @@
netManager.start();
startApplication();
init();
- datasetNetworkManager.start();
+ resultNetworkManager.start();
if (messagingNetManager != null) {
messagingNetManager.start();
}
@@ -323,11 +323,11 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+ NetworkAddress resultAddress = resultNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
NetworkAddress messagingAddress =
messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
- nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, hbSchema,
+ nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, resultAddress, hbSchema,
messagingAddress, application.getCapacity());
ncData = new NodeControllerData(nodeRegistration);
@@ -494,9 +494,9 @@
LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown");
}
partitionManager.close();
- datasetPartitionManager.close();
+ resultPartitionManager.close();
netManager.stop();
- datasetNetworkManager.stop();
+ resultNetworkManager.stop();
if (messagingNetManager != null) {
messagingNetManager.stop();
}
@@ -582,8 +582,8 @@
return netManager;
}
- public DatasetNetworkManager getDatasetNetworkManager() {
- return datasetNetworkManager;
+ public ResultNetworkManager getResultNetworkManager() {
+ return resultNetworkManager;
}
public PartitionManager getPartitionManager() {
@@ -645,8 +645,8 @@
getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return datasetPartitionManager;
+ public IResultPartitionManager getResultPartitionManager() {
+ return resultPartitionManager;
}
public MessagingNetworkManager getMessagingNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 340924d..f6531d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -43,7 +43,6 @@
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.state.IStateObject;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -57,6 +56,7 @@
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
@@ -427,8 +427,8 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return ncs.getDatasetPartitionManager();
+ public IResultPartitionManager getResultPartitionManager() {
+ return ncs.getResultPartitionManager();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
index dfa5ff3..9bfd061 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -85,11 +85,11 @@
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
- MuxDemuxPerformanceCounters datasetNetPC = ncs.getDatasetNetworkManager().getPerformanceCounters();
- hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
- hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
- hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
- hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+ MuxDemuxPerformanceCounters resultNetPC = ncs.getResultNetworkManager().getPerformanceCounters();
+ hbData.resultNetPayloadBytesRead = resultNetPC.getPayloadBytesRead();
+ hbData.resultNetPayloadBytesWritten = resultNetPC.getPayloadBytesWritten();
+ hbData.resultNetSignalingBytesRead = resultNetPC.getSignalingBytesRead();
+ hbData.resultNetSignalingBytesWritten = resultNetPC.getSignalingBytesWritten();
IPCPerformanceCounters ipcPC = ncs.getIpcSystem().getPerformanceCounters();
hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
similarity index 90%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index 5eba281..e56bfe6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -26,11 +26,11 @@
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
@@ -41,14 +41,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetNetworkManager implements IChannelConnectionFactory {
+public class ResultNetworkManager implements IChannelConnectionFactory {
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAX_CONNECTION_ATTEMPTS = 5;
static final int INITIAL_MESSAGE_SIZE = 20;
- private final IDatasetPartitionManager partitionManager;
+ private final IResultPartitionManager partitionManager;
private final MuxDemux md;
@@ -70,7 +70,7 @@
* - Port to report to consumers; useful when
* behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
*/
- public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager,
+ public ResultNetworkManager(String inetAddress, int inetPort, IResultPartitionManager partitionManager,
int nThreads, int nBuffers, String publicInetAddress, int publicInetPort,
IChannelInterfaceFactory channelInterfaceFactory) {
this.partitionManager = partitionManager;
@@ -138,13 +138,14 @@
ResultSetId rsId = new ResultSetId(buffer.getLong());
int partition = buffer.getInt();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Received initial dataset partition read request for JobId: " + jobId + " partition: "
+ LOGGER.debug("Received initial result partition read request for JobId: " + jobId + " partition: "
+ partition + " on channel: " + ccb);
}
noc = new NetworkOutputChannel(ccb, nBuffers);
try {
- partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
+ partitionManager.initializeResultPartitionReader(jobId, rsId, partition, noc);
} catch (HyracksException e) {
+ LOGGER.warn("Failed to initialize result partition reader", e);
noc.abort();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
similarity index 95%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
index 2eb33fd..47230a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.nio.ByteBuffer;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
similarity index 84%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
index 37cddb8..34c9eb5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.partitions.ResultSetPartitionId;
-public class DatasetMemoryManager {
+public class ResultMemoryManager {
private int availableMemory;
private final Set<Page> availPages;
@@ -39,14 +39,15 @@
private final static int FRAME_SIZE = 32768;
- public DatasetMemoryManager(int availableMemory) {
+ public ResultMemoryManager(int availableMemory) {
this.availableMemory = availableMemory;
availPages = new HashSet<Page>();
// Atleast have one page for temporarily storing the results.
- if (this.availableMemory <= FRAME_SIZE)
+ if (this.availableMemory <= FRAME_SIZE) {
this.availableMemory = FRAME_SIZE;
+ }
leastRecentlyUsedList = new LeastRecentlyUsedList();
resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
@@ -57,8 +58,8 @@
Page page;
if (availPages.isEmpty()) {
if (availableMemory >= FRAME_SIZE) {
- /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
- * instead of direct ByteBuffer.allocate()?
+ /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's
+ * allocateFrame() instead of direct ByteBuffer.allocate()?
*/
availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
availableMemory -= FRAME_SIZE;
@@ -73,10 +74,11 @@
page.clear();
/*
- * It is extremely important to update the reference after obtaining the page because, in the cases where memory
- * manager is allocated only one page of memory, the front of the LRU list should not be created by the
- * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
- * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
+ * It is extremely important to update the reference after obtaining the page because, in the cases where
+ * memory manager is allocated only one page of memory, the front of the LRU list should not be created by the
+ * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the
+ * page, then make a updateReference call which in turn creates a new node in the LRU list and then add the
+ * page to it.
*/
PartitionNode pn = updateReference(resultSetPartitionId, resultState);
pn.add(page);
@@ -84,7 +86,7 @@
}
public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
- // When a page is referenced the dataset partition writer should already be known, so we pass null.
+ // When a page is referenced the result partition writer should already be known, so we pass null.
updateReference(resultSetPartitionId, null);
}
@@ -121,8 +123,8 @@
ResultState resultState = pn.getResultState();
Page page = resultState.returnPage();
- /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
- * away all the pages allocated to it and add to the available pages set.
+ /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just
+ * take away all the pages allocated to it and add to the available pages set.
*/
if (page == null) {
availPages.addAll(pn);
@@ -130,9 +132,9 @@
resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
leastRecentlyUsedList.remove(pn);
- /* Based on the assumption that if the dataset partition writer returned a null page, it should be lying about
- * the number of pages it holds in which case we just evict all the pages it holds and should thus be able to
- * add all those pages to available set and we have at least one page to allocate back.
+ /* Based on the assumption that if the result partition writer returned a null page, it should be lying
+ * about the number of pages it holds in which case we just evict all the pages it holds and should thus be
+ * able to add all those pages to available set and we have at least one page to allocate back.
*/
page = getAvailablePage();
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
similarity index 80%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index b7cf9a4..82c7d50 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.util.HashMap;
import java.util.Map;
@@ -25,20 +25,20 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
-import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.common.result.AbstractResultManager;
+import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager {
+public class ResultPartitionManager extends AbstractResultManager implements IResultPartitionManager {
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
@@ -51,32 +51,32 @@
private final IWorkspaceFileFactory fileFactory;
- private final DatasetMemoryManager datasetMemoryManager;
+ private final ResultMemoryManager resultMemoryManager;
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
+ public ResultPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
long resultSweepThreshold) {
super(resultTTL);
this.ncs = ncs;
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
- if (availableMemory >= DatasetMemoryManager.getPageSize()) {
- datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ if (availableMemory >= ResultMemoryManager.getPageSize()) {
+ resultMemoryManager = new ResultMemoryManager(availableMemory);
} else {
- datasetMemoryManager = null;
+ resultMemoryManager = null;
}
partitionResultStateMap = new HashMap<>();
executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
}
@Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
boolean asyncMode, int partition, int nPartitions, long maxReads) {
- DatasetPartitionWriter dpw;
+ ResultPartitionWriter dpw;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
- datasetMemoryManager, fileFactory, maxReads);
+ dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
+ resultMemoryManager, fileFactory, maxReads);
ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
resultStates[partition] = dpw.getResultState();
@@ -91,7 +91,7 @@
try {
// Be sure to send the *public* network address to the CC
ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
- emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
+ emptyResult, partition, nPartitions, ncs.getResultNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
throw HyracksException.create(e);
}
@@ -109,10 +109,10 @@
}
@Override
- public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
+ public void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
IFrameWriter writer) throws HyracksException {
ResultState resultState = getResultState(jobId, resultSetId, partition);
- DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
+ ResultPartitionReader dpr = new ResultPartitionReader(this, resultMemoryManager, executor, resultState);
dpr.writeTo(writer);
LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId,
partition);
@@ -130,7 +130,7 @@
}
ResultState resultState = resultStates[partition];
if (resultState == null) {
- throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ throw new HyracksException("No ResultPartitionWriter for partition " + partition);
}
return resultState;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
similarity index 83%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
index 8c4fcb0..121d5a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
@@ -28,18 +28,18 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetPartitionReader {
+public class ResultPartitionReader {
private static final Logger LOGGER = LogManager.getLogger();
- private final DatasetPartitionManager datasetPartitionManager;
- private final DatasetMemoryManager datasetMemoryManager;
+ private final ResultPartitionManager resultPartitionManager;
+ private final ResultMemoryManager resultMemoryManager;
private final Executor executor;
private final ResultState resultState;
- public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
- DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
- this.datasetPartitionManager = datasetPartitionManager;
- this.datasetMemoryManager = datasetMemoryManager;
+ public ResultPartitionReader(ResultPartitionManager resultPartitionManager, ResultMemoryManager resultMemoryManager,
+ Executor executor, ResultState resultState) {
+ this.resultPartitionManager = resultPartitionManager;
+ this.resultMemoryManager = resultMemoryManager;
this.executor = executor;
this.resultState = resultState;
}
@@ -89,7 +89,7 @@
}
private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- return datasetMemoryManager != null ? resultState.read(datasetMemoryManager, offset, buffer)
+ return resultMemoryManager != null ? resultState.read(resultMemoryManager, offset, buffer)
: resultState.read(offset, buffer);
}
@@ -99,7 +99,7 @@
resultState.readClose();
if (resultState.isExhausted()) {
final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId();
- datasetPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(),
+ resultPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(),
partitionId.getPartition());
}
} catch (HyracksDataException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
similarity index 81%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
index b593bb5..f25bc58 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
@@ -16,26 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class DatasetPartitionWriter implements IFrameWriter {
+public class ResultPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = LogManager.getLogger();
- private final IDatasetPartitionManager manager;
+ private final IResultPartitionManager manager;
private final JobId jobId;
@@ -47,7 +47,7 @@
private final int nPartitions;
- private final DatasetMemoryManager datasetMemoryManager;
+ private final ResultMemoryManager resultMemoryManager;
private final ResultSetPartitionId resultSetPartitionId;
@@ -57,16 +57,16 @@
private boolean failed = false;
- public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
+ public ResultPartitionWriter(IHyracksTaskContext ctx, IResultPartitionManager manager, JobId jobId,
ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
- DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
+ ResultMemoryManager resultMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
this.orderedResult = orderedResult;
this.partition = partition;
this.nPartitions = nPartitions;
- this.datasetMemoryManager = datasetMemoryManager;
+ this.resultMemoryManager = resultMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory,
@@ -89,10 +89,10 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
registerResultPartitionLocation(false);
- if (datasetMemoryManager == null) {
+ if (resultMemoryManager == null) {
resultState.write(buffer);
} else {
- resultState.write(datasetMemoryManager, buffer);
+ resultState.write(resultMemoryManager, buffer);
}
}
@@ -132,11 +132,7 @@
partitionRegistered = true;
}
} catch (HyracksException e) {
- if (e instanceof HyracksDataException) {
- throw (HyracksDataException) e;
- } else {
- throw HyracksDataException.create(e);
- }
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
similarity index 94%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
index 1a64a5a..41b7f07 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
@@ -16,19 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultStateRecord;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-class ResultSetMap implements IDatasetStateRecord, Serializable {
+class ResultSetMap implements IResultStateRecord, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
similarity index 94%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
index 6b35912..25d3f00 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.nc.dataset;
+package org.apache.hyracks.control.nc.result;
import java.io.DataInput;
import java.io.DataOutput;
@@ -139,7 +139,7 @@
notifyAll();
}
- public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+ public synchronized void write(ResultMemoryManager resultMemoryManager, ByteBuffer buffer)
throws HyracksDataException {
int srcOffset = 0;
Page destPage = null;
@@ -150,7 +150,7 @@
while (srcOffset < buffer.limit()) {
if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
- destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ destPage = resultMemoryManager.requestPage(resultSetPartitionId, this);
localPageList.add(destPage);
}
int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
@@ -197,7 +197,7 @@
return readSize;
}
- public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ public synchronized long read(ResultMemoryManager resultMemoryManager, long offset, ByteBuffer buffer)
throws HyracksDataException {
long readSize = 0;
while (offset >= size && !eos.get() && !failed.get()) {
@@ -224,8 +224,8 @@
if (readSize < buffer.capacity()) {
long localPageOffset = offset - persistentSize;
- int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+ int localPageIndex = (int) (localPageOffset / ResultMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % ResultMemoryManager.getPageSize());
Page page = getPage(localPageIndex);
if (page == null) {
return readSize;
@@ -233,7 +233,7 @@
readSize += buffer.remaining();
buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
}
- datasetMemoryManager.pageReferenced(resultSetPartitionId);
+ resultMemoryManager.pageReferenced(resultSetPartitionId);
return readSize;
}
@@ -245,7 +245,8 @@
public synchronized Page returnPage() throws HyracksDataException {
Page page = removePage();
- // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+ // If we do not have any pages to be given back close the write channel since we don't write any more, return
+ // null.
if (page == null) {
ioManager.close(fileHandle);
return null;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index c6696fd..b11dada 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -23,9 +23,9 @@
import java.util.Deque;
import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -48,9 +48,9 @@
@Override
protected void doRun() throws Exception {
LOGGER.info("Aborting all tasks for controller {}", ccId);
- IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
- if (dpm == null) {
- LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
+ IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
+ if (resultPartitionManager == null) {
+ LOGGER.log(Level.WARN, "ResultPartitionManager is null on " + ncs.getId());
}
Deque<Task> abortedTasks = new ArrayDeque<>();
Collection<Joblet> joblets = ncs.getJobletMap().values();
@@ -61,9 +61,9 @@
abortedTasks.add(task);
});
final JobId jobId = joblet.getJobId();
- if (dpm != null) {
- dpm.abortReader(jobId);
- dpm.sweep(jobId);
+ if (resultPartitionManager != null) {
+ resultPartitionManager.abortReader(jobId);
+ resultPartitionManager.sweep(jobId);
}
ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
});
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 80f3e98..f47e1ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -22,8 +22,8 @@
import java.util.Map;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -52,9 +52,9 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
- IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
- if (dpm != null) {
- ncs.getDatasetPartitionManager().abortReader(jobId);
+ IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
+ if (resultPartitionManager != null) {
+ ncs.getResultPartitionManager().abortReader(jobId);
}
Joblet ji = ncs.getJobletMap().get(jobId);
if (ji != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index ac80afa..7dbb3c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -21,8 +21,8 @@
import java.util.List;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
@@ -52,9 +52,9 @@
LOGGER.log(Level.WARN, ncs.getId() + " is sending a notification to cc that task " + taskId + " has failed",
exceptions.get(0));
try {
- IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
- if (dpm != null) {
- dpm.abortReader(jobId);
+ IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
+ if (resultPartitionManager != null) {
+ resultPartitionManager.abortReader(jobId);
}
ncs.getClusterController(jobId.getCcId()).notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions);
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index b0cc40c..9fb9815 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -31,11 +31,11 @@
import org.apache.hyracks.api.dataflow.value.IResultSerializer;
import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.comm.io.FrameOutputStream;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -67,7 +67,7 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
+ final IResultPartitionManager resultPartitionManager = ctx.getResultPartitionManager();
final IFrame frame = new VSizeFrame(ctx);
@@ -82,15 +82,15 @@
final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private IFrameWriter datasetPartitionWriter;
+ private IFrameWriter resultPartitionWriter;
private boolean failed = false;
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
- nPartitions, maxReads);
- datasetPartitionWriter.open();
+ resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, ordered,
+ asyncMode, partition, nPartitions, maxReads);
+ resultPartitionWriter.open();
resultSerializer.init();
} catch (HyracksException e) {
throw HyracksDataException.create(e);
@@ -103,7 +103,7 @@
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
resultSerializer.appendTuple(frameTupleAccessor, tIndex);
if (!frameOutputStream.appendTuple()) {
- frameOutputStream.flush(datasetPartitionWriter);
+ frameOutputStream.flush(resultPartitionWriter);
resultSerializer.appendTuple(frameTupleAccessor, tIndex);
frameOutputStream.appendTuple();
@@ -114,23 +114,23 @@
@Override
public void fail() throws HyracksDataException {
failed = true;
- if (datasetPartitionWriter != null) {
- datasetPartitionWriter.fail();
+ if (resultPartitionWriter != null) {
+ resultPartitionWriter.fail();
}
}
@Override
public void close() throws HyracksDataException {
- if (datasetPartitionWriter != null) {
+ if (resultPartitionWriter != null) {
try {
if (!failed && frameOutputStream.getTupleCount() > 0) {
- frameOutputStream.flush(datasetPartitionWriter);
+ frameOutputStream.flush(resultPartitionWriter);
}
} catch (Exception e) {
- datasetPartitionWriter.fail();
+ resultPartitionWriter.fail();
throw e;
} finally {
- datasetPartitionWriter.close();
+ resultPartitionWriter.close();
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 0931501..5dcc99a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -36,16 +36,16 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -154,8 +154,8 @@
IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
- IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
+ IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders);
+ IResultSetReader reader = resultSet.createReader(jobId, resultSetId);
List<String> resultRecords = new ArrayList<>();
ByteBufferInputStream bbis = new ByteBufferInputStream();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 58da8a2..55fd9a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -29,8 +29,6 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobFlag;
@@ -38,7 +36,9 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.application.CCServiceContext;
@@ -160,8 +160,8 @@
IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
if (!spec.getResultSetIds().isEmpty()) {
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
- IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
+ IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders);
+ IResultSetReader reader = resultSet.createReader(jobId, spec.getResultSetIds().get(0));
ObjectMapper om = new ObjectMapper();
ArrayNode resultRecords = om.createArrayNode();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index 752c643..39e7a45 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -28,10 +28,10 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index c28a5aa..c542fe9 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -27,10 +27,10 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 9e795bf..1dce88d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -27,10 +27,10 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 49dee84..8d3b0dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -29,10 +29,10 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
index 672d2c4..9d79d80 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
@@ -45,8 +45,8 @@
"nonheap-committed-sizes", "nonheap-max-sizes", "application-memory-budget", "application-cpu-core-budget",
"thread-counts", "peak-thread-counts", "system-load-averages", "gc-names", "gc-collection-counts",
"gc-collection-times", "net-payload-bytes-read", "net-payload-bytes-written", "net-signaling-bytes-read",
- "net-signaling-bytes-written", "dataset-net-payload-bytes-read", "dataset-net-payload-bytes-written",
- "dataset-net-signaling-bytes-read", "dataset-net-signaling-bytes-written", "ipc-messages-sent",
+ "net-signaling-bytes-written", "result-net-payload-bytes-read", "result-net-payload-bytes-written",
+ "result-net-signaling-bytes-read", "result-net-signaling-bytes-written", "ipc-messages-sent",
"ipc-message-bytes-sent", "ipc-messages-received", "ipc-message-bytes-received", "disk-reads",
"disk-writes", "config" };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index 09629b2..bc5de11 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -27,10 +27,10 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index 75ba33f..7838a34 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -26,10 +26,10 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 315b74c..06c0eaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 816f3fa..6154e28 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -28,10 +28,10 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index c2b3263..0eb6810 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -32,11 +32,11 @@
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 81a71eb..48f7837 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -24,10 +24,10 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index ac52573..174d5cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -30,7 +30,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.state.IStateObject;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -39,6 +38,7 @@
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -134,7 +134,7 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
+ public IResultPartitionManager getResultPartitionManager() {
return null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 2f58259..1ceb199 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.util;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -44,7 +44,6 @@
import org.apache.hyracks.api.dataflow.state.IStateObject;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -55,6 +54,7 @@
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.util.GrowableArray;
@@ -726,7 +726,7 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
+ public IResultPartitionManager getResultPartitionManager() {
return null;
}