Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index 478d64e..bff23bc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -17,7 +17,7 @@
import java.util.HashMap;
import java.util.List;
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
public enum Status {
RUNNING,
SUCCESS,
@@ -26,11 +26,14 @@
private static final long serialVersionUID = 1L;
+ private final long timestamp;
+
private Status status;
private List<Exception> exceptions;
public DatasetJobRecord() {
+ this.timestamp = System.currentTimeMillis();
this.status = Status.RUNNING;
}
@@ -51,6 +54,10 @@
this.exceptions = exceptions;
}
+ public long getTimestamp() {
+ return timestamp;
+ }
+
public Status getStatus() {
return status;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
similarity index 68%
copy from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
copy to hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
index aad5b55..b6cd1af 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -14,20 +14,12 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import java.nio.ByteBuffer;
+import java.util.Map;
-public class Page {
- private final ByteBuffer buffer;
+import edu.uci.ics.hyracks.api.job.JobId;
- public Page(ByteBuffer buffer) {
- this.buffer = buffer;
- }
+public interface IDatasetManager {
+ public Map<JobId, IDatasetStateRecord> getStateMap();
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- public ByteBuffer clear() {
- return (ByteBuffer) buffer.clear();
- }
+ public void deinitState(JobId jobId);
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index e8cceb9..32910ac 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -20,9 +20,9 @@
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface IDatasetPartitionManager {
+public interface IDatasetPartitionManager extends IDatasetManager {
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException;
+ boolean asyncMode, int partition, int nPartitions) throws HyracksException;
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
throws HyracksException;
@@ -32,6 +32,8 @@
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
throws HyracksException;
+ public void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+
public void abortReader(JobId jobId);
public IWorkspaceFileFactory getFileFactory();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
similarity index 68%
copy from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
copy to hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
index aad5b55..1a400e3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
@@ -14,20 +14,6 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import java.nio.ByteBuffer;
-
-public class Page {
- private final ByteBuffer buffer;
-
- public Page(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- public ByteBuffer clear() {
- return (ByteBuffer) buffer.clear();
- }
+public interface IDatasetStateRecord {
+ public long getTimestamp();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e80c263..0463350 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,7 +40,6 @@
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -48,6 +47,7 @@
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
+import edu.uci.ics.hyracks.control.cc.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
@@ -188,7 +188,7 @@
}
};
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
+ datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
jobCounter = 0;
deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
@@ -220,6 +220,8 @@
timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
jobLog.open();
startApplication();
+
+ datasetDirectoryService.init(executor);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 0db1c7a..ee1cc67 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,18 +18,20 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
/**
* TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -39,22 +41,27 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
- private final Map<JobId, DatasetJobRecord> jobResultLocations;
+ private final long resultTTL;
- public DatasetDirectoryService(final int jobHistorySize) {
- jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
- private static final long serialVersionUID = 1L;
+ private final long resultSweepThreshold;
- protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
- return size() > jobHistorySize;
- }
- };
+ private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ }
+
+ @Override
+ public void init(ExecutorService executor) {
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
throws HyracksException {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
jobResultLocations.put(jobId, djr);
@@ -74,7 +81,7 @@
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
int partition, int nPartitions, NetworkAddress networkAddress) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
@@ -95,7 +102,7 @@
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
int successCount = 0;
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
records[partition].writeEOS();
@@ -113,7 +120,7 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr != null) {
djr.fail();
}
@@ -122,7 +129,7 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr != null) {
djr.fail(exceptions);
}
@@ -132,7 +139,7 @@
@Override
public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
DatasetJobRecord djr;
- while ((djr = jobResultLocations.get(jobId)) == null) {
+ while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
try {
wait();
} catch (InterruptedException e) {
@@ -144,6 +151,16 @@
}
@Override
+ public Map<JobId, IDatasetStateRecord> getStateMap() {
+ return jobResultLocations;
+ }
+
+ @Override
+ public void deinitState(JobId jobId) {
+ jobResultLocations.remove(jobResultLocations.get(jobId));
+ }
+
+ @Override
public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
DatasetDirectoryRecord[] newRecords;
@@ -191,7 +208,7 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr == null) {
throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
similarity index 83%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
rename to hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index ea5da5e..3de3c50 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -12,17 +12,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.control.cc.dataset;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface IDatasetDirectoryService extends IJobLifecycleListener {
+public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
+ public void init(ExecutorService executor);
+
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
int nPartitions, NetworkAddress networkAddress);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index f1f1dc4..fd6b690 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -15,11 +15,11 @@
package edu.uci.ics.hyracks.control.cc.work;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index c4acd47..c7244fb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -52,6 +52,12 @@
@Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
public int jobHistorySize = 10;
+ @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)")
+ public long resultTTL = 86400000;
+
+ @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)")
+ public long resultSweepThreshold = 60000;
+
@Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
public String ccRoot = "ClusterControllerService";
@@ -86,6 +92,10 @@
cList.add(String.valueOf(defaultMaxJobAttempts));
cList.add("-job-history-size");
cList.add(String.valueOf(jobHistorySize));
+ cList.add("-result-time-to-live");
+ cList.add(String.valueOf(resultTTL));
+ cList.add("-result-sweep-threshold");
+ cList.add(String.valueOf(resultSweepThreshold));
cList.add("-cc-root");
cList.add(ccRoot);
if (clusterTopologyDefinition != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 32d8bf0..43129f5 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -23,100 +23,104 @@
import org.kohsuke.args4j.spi.StopOptionHandler;
public class NCConfig implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
- public String ccHost;
+ @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+ public String ccHost;
- @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
- public int ccPort = 1099;
+ @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+ public int ccPort = 1099;
- @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
- public String clusterNetIPAddress;
+ @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+ public String clusterNetIPAddress;
- @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
- public String nodeId;
+ @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+ public String nodeId;
- @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
- public String dataIPAddress;
+ @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+ public String dataIPAddress;
- @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
- public String datasetIPAddress;
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ public String datasetIPAddress;
- @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
- public String ioDevices = System.getProperty("java.io.tmpdir");
+ @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+ public String ioDevices = System.getProperty("java.io.tmpdir");
- @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
- public int nNetThreads = 1;
+ @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+ public int nNetThreads = 1;
- @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
- public int maxMemory = -1;
+ @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+ public int maxMemory = -1;
- @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
- public int resultHistorySize = 100;
+ @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)")
+ public long resultTTL = 86400000;
- @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
- public int resultManagerMemory = -1;
+ @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)")
+ public long resultSweepThreshold = 60000;
- @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
- public String appNCMainClass;
+ @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+ public int resultManagerMemory = -1;
- @Argument
- @Option(name = "--", handler = StopOptionHandler.class)
- public List<String> appArgs;
+ @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
+ public String appNCMainClass;
- public void toCommandLine(List<String> cList) {
- cList.add("-cc-host");
- cList.add(ccHost);
- cList.add("-cc-port");
- cList.add(String.valueOf(ccPort));
- cList.add("-cluster-net-ip-address");
- cList.add(clusterNetIPAddress);
- cList.add("-node-id");
- cList.add(nodeId);
- cList.add("-data-ip-address");
- cList.add(dataIPAddress);
- cList.add(datasetIPAddress);
- cList.add("-iodevices");
- cList.add(ioDevices);
- cList.add("-net-thread-count");
- cList.add(String.valueOf(nNetThreads));
- cList.add("-max-memory");
- cList.add(String.valueOf(maxMemory));
- cList.add("-result-history-size");
- cList.add(String.valueOf(resultHistorySize));
- cList.add("-result-manager-memory");
- cList.add(String.valueOf(resultManagerMemory));
+ @Argument
+ @Option(name = "--", handler = StopOptionHandler.class)
+ public List<String> appArgs;
- if (appNCMainClass != null) {
- cList.add("-app-nc-main-class");
- cList.add(appNCMainClass);
- }
- if (appArgs != null && !appArgs.isEmpty()) {
- cList.add("--");
- for (String appArg : appArgs) {
- cList.add(appArg);
- }
- }
- }
+ public void toCommandLine(List<String> cList) {
+ cList.add("-cc-host");
+ cList.add(ccHost);
+ cList.add("-cc-port");
+ cList.add(String.valueOf(ccPort));
+ cList.add("-cluster-net-ip-address");
+ cList.add(clusterNetIPAddress);
+ cList.add("-node-id");
+ cList.add(nodeId);
+ cList.add("-data-ip-address");
+ cList.add(dataIPAddress);
+ cList.add(datasetIPAddress);
+ cList.add("-iodevices");
+ cList.add(ioDevices);
+ cList.add("-net-thread-count");
+ cList.add(String.valueOf(nNetThreads));
+ cList.add("-max-memory");
+ cList.add(String.valueOf(maxMemory));
+ cList.add("-result-time-to-live");
+ cList.add(String.valueOf(resultTTL));
+ cList.add("-result-sweep-threshold");
+ cList.add(String.valueOf(resultSweepThreshold));
+ cList.add("-result-manager-memory");
+ cList.add(String.valueOf(resultManagerMemory));
- public void toMap(Map<String, String> configuration) {
- configuration.put("cc-host", ccHost);
- configuration.put("cc-port", (String.valueOf(ccPort)));
- configuration.put("cluster-net-ip-address", clusterNetIPAddress);
- configuration.put("node-id", nodeId);
- configuration.put("data-ip-address", dataIPAddress);
- configuration.put("iodevices", ioDevices);
- configuration.put("net-thread-count", String.valueOf(nNetThreads));
- configuration.put("max-memory", String.valueOf(maxMemory));
- configuration.put("result-history-size",
- String.valueOf(resultHistorySize));
- configuration.put("result-manager-memory",
- String.valueOf(resultManagerMemory));
+ if (appNCMainClass != null) {
+ cList.add("-app-nc-main-class");
+ cList.add(appNCMainClass);
+ }
+ if (appArgs != null && !appArgs.isEmpty()) {
+ cList.add("--");
+ for (String appArg : appArgs) {
+ cList.add(appArg);
+ }
+ }
+ }
- if (appNCMainClass != null) {
- configuration.put("app-nc-main-class", appNCMainClass);
- }
+ public void toMap(Map<String, String> configuration) {
+ configuration.put("cc-host", ccHost);
+ configuration.put("cc-port", (String.valueOf(ccPort)));
+ configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+ configuration.put("node-id", nodeId);
+ configuration.put("data-ip-address", dataIPAddress);
+ configuration.put("iodevices", ioDevices);
+ configuration.put("net-thread-count", String.valueOf(nNetThreads));
+ configuration.put("max-memory", String.valueOf(maxMemory));
+ configuration.put("result-time-to-live", String.valueOf(resultTTL));
+ configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
+ configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
- }
+ if (appNCMainClass != null) {
+ configuration.put("app-nc-main-class", appNCMainClass);
+ }
+
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
new file mode 100644
index 0000000..69b560c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * Sweeper to clean up the stale result distribution files and result states.
+ */
+public class ResultStateSweeper implements Runnable {
+ private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
+
+ private final IDatasetManager datasetManager;
+
+ private final long resultTTL;
+
+ private final long resultSweepThreshold;
+
+ private final List<JobId> toBeCollected;
+
+ public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+ this.datasetManager = datasetManager;
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ toBeCollected = new ArrayList<JobId>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(resultSweepThreshold);
+ sweep();
+ } catch (InterruptedException e) {
+ LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+ // There isn't much we can do really here
+ }
+ }
+
+ }
+
+ private void sweep() {
+ synchronized (datasetManager) {
+ toBeCollected.clear();
+ for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
+ if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+ toBeCollected.add(entry.getKey());
+ }
+ }
+ for (JobId jobId : toBeCollected) {
+ datasetManager.deinitState(jobId);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Result state cleanup instance successfully completed.");
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 37b64a7..60859e3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -217,7 +217,7 @@
private void init() throws Exception {
ctx.getIOManager().setExecutor(executor);
datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultHistorySize);
+ ncConfig.resultTTL, ncConfig.resultSweepThreshold);
datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
datasetPartitionManager, ncConfig.nNetThreads);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index e8ef169..d9f7302 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index fe48c17..325e2bc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -24,10 +24,12 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -40,7 +42,7 @@
private final Executor executor;
- private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
+ private final Map<JobId, IDatasetStateRecord> partitionResultStateMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -48,8 +50,8 @@
private final DatasetMemoryManager datasetMemoryManager;
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
- final int resultHistorySize) {
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
+ long resultSweepThreshold) {
this.ncs = ncs;
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -59,35 +61,25 @@
} else {
datasetMemoryManager = null;
}
- partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
- private static final long serialVersionUID = 1L;
-
- protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
- synchronized (DatasetPartitionManager.this) {
- if (size() > resultHistorySize) {
- deinitState(eldest);
- return true;
- }
- return false;
- }
- }
- };
+ partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException {
+ boolean asyncMode, int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
synchronized (this) {
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
+ fileFactory);
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
- rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+ rsIdMap = new ResultSetMap();
partitionResultStateMap.put(jobId, rsIdMap);
}
@@ -133,7 +125,7 @@
IFrameWriter writer) throws HyracksException {
ResultState resultState;
synchronized (this) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
throw new HyracksException("Unknown JobId " + jobId);
@@ -150,15 +142,44 @@
}
}
- DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+ partition);
}
@Override
+ public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+ if (rsIdMap != null) {
+ ResultState[] resultStates = rsIdMap.get(resultSetId);
+ if (resultStates != null) {
+ ResultState state = resultStates[partition];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
+ }
+ resultStates[partition] = null;
+ boolean stateEmpty = true;
+ for (int i = 0; i < resultStates.length; i++) {
+ if (resultStates[i] != null) {
+ stateEmpty = false;
+ break;
+ }
+ }
+ if (stateEmpty) {
+ rsIdMap.remove(resultSetId);
+ }
+ }
+ if (rsIdMap.isEmpty()) {
+ partitionResultStateMap.remove(jobId);
+ }
+ }
+ }
+
+ @Override
public synchronized void abortReader(JobId jobId) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
return;
@@ -183,14 +204,25 @@
@Override
public synchronized void close() {
- for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
- deinitState(entry);
+ for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
+ deinit(entry.getKey());
}
deallocatableRegistry.close();
}
- public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
- Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+ @Override
+ public Map<JobId, IDatasetStateRecord> getStateMap() {
+ return partitionResultStateMap;
+ }
+
+ @Override
+ public void deinitState(JobId jobId) {
+ deinit(jobId);
+ partitionResultStateMap.remove(jobId);
+ }
+
+ private void deinit(JobId jobId) {
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap != null) {
for (ResultSetId rsId : rsIdMap.keySet()) {
ResultState[] resultStates = rsIdMap.get(rsId);
@@ -199,11 +231,26 @@
ResultState state = resultStates[i];
if (state != null) {
state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
}
}
}
}
}
}
+
+ private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
+ private static final long serialVersionUID = 1L;
+
+ long timestamp;
+
+ public ResultSetMap() {
+ super();
+ timestamp = System.currentTimeMillis();
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 9cab183..6d6fc83 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -26,13 +26,17 @@
public class DatasetPartitionReader {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
+ private final DatasetPartitionManager datasetPartitionManager;
+
private final DatasetMemoryManager datasetMemoryManager;
private final Executor executor;
private final ResultState resultState;
- public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+ public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
+ DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+ this.datasetPartitionManager = datasetPartitionManager;
this.datasetMemoryManager = datasetMemoryManager;
this.executor = executor;
this.resultState = resultState;
@@ -66,6 +70,12 @@
} finally {
channel.close();
resultState.readClose();
+ // If the query is a synchronous query, remove its partition as soon as it is read.
+ if (!resultState.getAsyncMode()) {
+ datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
+ resultState.getResultSetPartitionId().getResultSetId(), resultState
+ .getResultSetPartitionId().getPartition());
+ }
}
} catch (HyracksDataException e) {
throw new RuntimeException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 58f0806..d61da67 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -37,6 +37,8 @@
private final ResultSetId resultSetId;
+ private final boolean asyncMode;
+
private final int partition;
private final DatasetMemoryManager datasetMemoryManager;
@@ -46,16 +48,18 @@
private final ResultState resultState;
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+ ResultSetId rsId, boolean asyncMode, int partition, DatasetMemoryManager datasetMemoryManager,
IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
+ this.asyncMode = asyncMode;
this.partition = partition;
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
- resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
+ resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIOManager(), fileFactory,
+ ctx.getFrameSize());
}
public ResultState getResultState() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
similarity index 94%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
rename to hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
index aad5b55..4387e2b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.control.nc.dataset;
import java.nio.ByteBuffer;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 277e241..c24034d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -23,7 +23,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
@@ -37,6 +36,8 @@
private final ResultSetPartitionId resultSetPartitionId;
+ private final boolean asyncMode;
+
private final int frameSize;
private final IIOManager ioManager;
@@ -59,9 +60,10 @@
private long persistentSize;
- ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
- int frameSize) {
+ ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager,
+ IWorkspaceFileFactory fileFactory, int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
+ this.asyncMode = asyncMode;
this.ioManager = ioManager;
this.fileFactory = fileFactory;
this.frameSize = frameSize;
@@ -251,6 +253,10 @@
return ioManager;
}
+ public boolean getAsyncMode() {
+ return asyncMode;
+ }
+
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index f1bd693..7330590 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -42,13 +42,16 @@
private final boolean ordered;
+ private final boolean asyncMode;
+
private final IResultSerializerFactory resultSerializerFactory;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
- IResultSerializerFactory resultSerializerFactory) throws IOException {
+ boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
this.ordered = ordered;
+ this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
}
@@ -75,7 +78,7 @@
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition,
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
nPartitions);
datasetPartitionWriter.open();
resultSerializer.init();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 07b57c8..0a5f2c5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -94,7 +94,7 @@
throws IOException {
ResultSetId rsId = new ResultSetId(1);
- AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index cee8a0f..906a750 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -97,7 +97,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -169,7 +169,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
@@ -242,7 +242,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 7c8048d..42c823a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -205,7 +205,7 @@
throws IOException {
ResultSetId rsId = new ResultSetId(1);
- AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 845cd50..380ba29 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -80,7 +80,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -135,7 +135,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 9c93874..c42dc75 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -66,7 +66,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
@@ -102,7 +102,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
@@ -141,7 +141,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 16179da..48db176 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -78,7 +78,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -125,7 +125,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
index 6494b28..8446b83 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -90,7 +90,7 @@
ResultSetId rsId = new ResultSetId(i);
spec.addResultSetId(rsId);
- outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true,
+ outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2e8997d..734c3da 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -128,7 +128,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -211,7 +211,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -294,7 +294,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -378,7 +378,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -466,7 +466,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -554,7 +554,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -635,7 +635,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -726,7 +726,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -817,7 +817,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -904,7 +904,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -997,7 +997,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index ba4404f..daab141 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -176,7 +176,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -253,7 +253,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -330,7 +330,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -412,7 +412,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
index b338601..ef31508 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -70,7 +70,7 @@
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);