reduced communication during result distribution
- when reporting the location of results, the NCs also report if the
result partition is empty
- the client does not try to read empty partitions
better toString() for subclasses of AbstractWork
Change-Id: Ia39f657e689ea305d49d55bd27c9a512e1ff970f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/39
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
index e141d06..019d018 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,6 +34,8 @@
private Status status;
+ private boolean empty;
+
public DatasetDirectoryRecord() {
this.address = null;
this.readEOS = false;
@@ -48,6 +50,14 @@
return address;
}
+ public void setEmpty(boolean empty) {
+ this.empty = empty;
+ }
+
+ public boolean getEmpty() {
+ return empty;
+ }
+
public void readEOS() {
this.readEOS = true;
}
@@ -82,4 +92,9 @@
}
return address.equals(((DatasetDirectoryRecord) o).address);
}
+
+ @Override
+ public String toString() {
+ return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
index b6cd1af..60366d3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,12 +14,15 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import java.util.Map;
+import java.util.Set;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetManager {
- public Map<JobId, IDatasetStateRecord> getStateMap();
+
+ public Set<JobId> getJobIds();
+
+ public IDatasetStateRecord getState(JobId jobId);
public void deinitState(JobId jobId);
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 32910ac..4b7d27a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -24,6 +24,9 @@
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
boolean asyncMode, int partition, int nPartitions) throws HyracksException;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+ boolean orderedResult, boolean emptyResult) throws HyracksException;
+
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
throws HyracksException;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 51e4950..187358c 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -89,64 +89,54 @@
return status;
}
+ private DatasetDirectoryRecord getRecord(int partition) throws Exception {
+ while (knownRecords == null || knownRecords[partition] == null) {
+ knownRecords = datasetDirectoryServiceConnection
+ .getDatasetResultLocations(jobId, resultSetId, knownRecords);
+ }
+ return knownRecords[partition];
+ }
+
+ private boolean nextPartition() throws HyracksDataException {
+ ++lastReadPartition;
+ try {
+ DatasetDirectoryRecord record = getRecord(lastReadPartition);
+ while (record.getEmpty() && (++lastReadPartition) < knownRecords.length) {
+ record = getRecord(lastReadPartition);
+ }
+ if (lastReadPartition == knownRecords.length) {
+ return false;
+ }
+ resultChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
+ lastReadPartition, NUM_READ_BUFFERS);
+ lastMonitor = getMonitor(lastReadPartition);
+ resultChannel.registerMonitor(lastMonitor);
+ resultChannel.open(datasetClientCtx);
+ return true;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
@Override
public int read(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer readBuffer;
int readSize = 0;
if (lastReadPartition == -1) {
- while (knownRecords == null || knownRecords[0] == null) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId,
- knownRecords);
- lastReadPartition = 0;
- resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
- NUM_READ_BUFFERS);
- lastMonitor = getMonitor(lastReadPartition);
- resultChannel.registerMonitor(lastMonitor);
- resultChannel.open(datasetClientCtx);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ if (!nextPartition()) {
+ return readSize;
}
}
- while (readSize <= 0 && !(isLastPartitionReadComplete())) {
- synchronized (lastMonitor) {
- while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
- try {
- lastMonitor.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (lastMonitor.failed()) {
- throw new HyracksDataException("Job Failed.");
- }
+ while (readSize <= 0
+ && !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) {
+ waitForNextFrame(lastMonitor);
if (isPartitionReadComplete(lastMonitor)) {
knownRecords[lastReadPartition].readEOS();
- if ((lastReadPartition == knownRecords.length - 1)) {
+ resultChannel.close();
+ if ((lastReadPartition == knownRecords.length - 1) || !nextPartition()) {
break;
- } else {
- try {
- lastReadPartition++;
- while (knownRecords[lastReadPartition] == null) {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
- resultSetId, knownRecords);
- }
-
- resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
- lastReadPartition, NUM_READ_BUFFERS);
- lastMonitor = getMonitor(lastReadPartition);
- resultChannel.registerMonitor(lastMonitor);
- resultChannel.open(datasetClientCtx);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
}
} else {
readBuffer = resultChannel.getNextBuffer();
@@ -163,26 +153,25 @@
return readSize;
}
- private boolean nullExists(DatasetDirectoryRecord[] locations) {
- if (locations == null) {
- return true;
- }
- for (int i = 0; i < locations.length; i++) {
- if (locations[i] == null) {
- return true;
+ private static void waitForNextFrame(IDatasetInputChannelMonitor monitor) throws HyracksDataException {
+ synchronized (monitor) {
+ while (monitor.getNFramesAvailable() <= 0 && !monitor.eosReached() && !monitor.failed()) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
}
}
- return false;
+ if (monitor.failed()) {
+ throw new HyracksDataException("Job Failed.");
+ }
}
private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
}
- private boolean isLastPartitionReadComplete() {
- return ((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor));
- }
-
private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
NetworkAddress netAddr = addr.getNetworkAddress();
return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index b665a4f..c994dfb 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -513,8 +513,8 @@
case REGISTER_RESULT_PARTITION_LOCATION: {
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
- .getNPartitions(), rrplf.getNetworkAddress()));
+ .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+ rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..66e7e7a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,11 @@
package edu.uci.ics.hyracks.control.cc.dataset;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -32,6 +34,7 @@
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
/**
* TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -45,12 +48,12 @@
private final long resultSweepThreshold;
- private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+ private final Map<JobId, JobResultInfo> jobResultLocations;
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
- jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
}
@Override
@@ -61,10 +64,10 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
throws HyracksException {
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
- jobResultLocations.put(jobId, djr);
+ jobResultLocations.put(jobId, new JobResultInfo(djr, null));
}
}
@@ -78,10 +81,15 @@
// Auto-generated method stub
}
+ private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
+ final JobResultInfo jri = jobResultLocations.get(jobId);
+ return jri == null ? null : jri.record;
+ }
+
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions, NetworkAddress networkAddress) {
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
@@ -94,7 +102,22 @@
records[partition] = new DatasetDirectoryRecord();
}
records[partition].setNetworkAddress(networkAddress);
+ records[partition].setEmpty(emptyResult);
records[partition].start();
+
+ Waiters waiters = jobResultLocations.get(jobId).waiters;
+ Waiter waiter = waiters != null ? waiters.get(rsId) : null;
+ if (waiter != null) {
+ try {
+ DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
+ if (updatedRecords != null) {
+ waiters.remove(rsId);
+ waiter.callback.setValue(updatedRecords);
+ }
+ } catch (Exception e) {
+ waiter.callback.setException(e);
+ }
+ }
notifyAll();
}
@@ -102,7 +125,7 @@
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
int successCount = 0;
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
records[partition].writeEOS();
@@ -120,26 +143,38 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr != null) {
djr.fail();
}
+ final Waiters waiters = jobResultLocations.get(jobId).waiters;
+ if (waiters != null) {
+ waiters.get(rsId).callback.setException(new Exception());
+ waiters.remove(rsId);
+ }
notifyAll();
}
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr != null) {
djr.fail(exceptions);
}
+ final Waiters waiters = jobResultLocations.get(jobId).waiters;
+ if (waiters != null) {
+ for (ResultSetId rsId : waiters.keySet()) {
+ waiters.get(rsId).callback.setException(exceptions.get(0));
+ waiters.remove(rsId);
+ }
+ }
notifyAll();
}
@Override
public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
DatasetJobRecord djr;
- while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
+ while ((djr = getDatasetJobRecord(jobId)) == null) {
try {
wait();
} catch (InterruptedException e) {
@@ -151,8 +186,13 @@
}
@Override
- public Map<JobId, IDatasetStateRecord> getStateMap() {
- return jobResultLocations;
+ public Set<JobId> getJobIds() {
+ return jobResultLocations.keySet();
+ }
+
+ @Override
+ public IDatasetStateRecord getState(JobId jobId) {
+ return jobResultLocations.get(jobId).record;
}
@Override
@@ -161,39 +201,35 @@
}
@Override
- public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
- DatasetDirectoryRecord[] newRecords;
- while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ public synchronized void getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback)
+ throws HyracksDataException {
+ DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
+ if (updatedRecords == null) {
+ JobResultInfo jri = jobResultLocations.get(jobId);
+ Waiters waiters;
+ if (jri == null) {
+ waiters = new Waiters();
+ jri = new JobResultInfo(null, waiters);
+ jobResultLocations.put(jobId, jri);
+ } else {
+ waiters = jri.waiters;
+ if (waiters == null) {
+ waiters = new Waiters();
+ jri.waiters = waiters;
+ }
}
+ waiters.put(rsId, new Waiter(knownRecords, callback));
+ } else {
+ callback.setValue(updatedRecords);
}
- return newRecords;
}
/**
* Compares the records already known by the client for the given job's result set id with the records that the
* dataset directory service knows and if there are any newly discovered records returns a whole array with the
* new records filled in.
- * This method has a very convoluted logic. Here is the explanation of how it works.
- * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
- * the order of the partitions. It always traverses the array in the first to last order!
- * If known records array or the first element in that array is null in the but the record for that partition now
- * known to the directory service, the method fills in that record in the array and returns the array back.
- * However, if the first known null record is not a first element in the array, by induction, all the previous
- * known records should be known already be known to client and none of the records for the partitions ahead is
- * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
- * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
- * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
- * If the client has reached the end of stream record for the last known non-null record, we check if the next record
- * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
- * send null otherwise.
- * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
- * arrays are equal and if they are not we send the entire new updated array.
- *
+ *
* @param jobId
* - Id of the job for which the directory records should be retrieved.
* @param rsId
@@ -201,14 +237,14 @@
* @param knownRecords
* - An array of directory records that the client is already aware of.
* @return
- * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+ * Returns the updated records if new record were discovered, null otherwise
* @throws HyracksDataException
* TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
* every check. This already looks very expensive.
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr == null) {
throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
@@ -228,36 +264,32 @@
return null;
}
- boolean ordered = resultSetMetaData.getOrderedResult();
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
- * we can simply check if there are any newly discovered records and send the whole array back if there are.
- */
- if (ordered) {
- // Iterate over the known records and find the last record which is not null.
- int i = 0;
- for (i = 0; i < records.length; i++) {
- if (knownRecords == null) {
- if (records[0] != null) {
- knownRecords = new DatasetDirectoryRecord[records.length];
- knownRecords[0] = records[0];
- return knownRecords;
- }
- return null;
- }
- if (knownRecords[i] == null) {
- if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
- knownRecords[i] = records[i];
- return knownRecords;
- }
- return null;
- }
- }
- } else {
- if (!Arrays.equals(records, knownRecords)) {
- return records;
- }
- }
- return null;
+
+ return Arrays.equals(records, knownRecords) ? null : records;
}
-}
\ No newline at end of file
+}
+
+class JobResultInfo {
+ JobResultInfo(DatasetJobRecord record, Waiters waiters) {
+ this.record = record;
+ this.waiters = waiters;
+ }
+
+ DatasetJobRecord record;
+ Waiters waiters;
+}
+
+class Waiters extends HashMap<ResultSetId, Waiter> {
+ private static final long serialVersionUID = 1L;
+}
+
+class Waiter {
+ Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+ this.knownRecords = knownRecords;
+ this.callback = callback;
+ }
+
+ DatasetDirectoryRecord[] knownRecords;
+ IResultCallback<DatasetDirectoryRecord[]> callback;
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..30d1d15 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -25,12 +25,13 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
public void init(ExecutorService executor);
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
- int nPartitions, NetworkAddress networkAddress);
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress);
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
@@ -40,6 +41,6 @@
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
- public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+ public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownLocations,
+ IResultCallback<DatasetDirectoryRecord[]> callback) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index c4d202f..842c7c8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -61,6 +61,6 @@
@Override
public String toString() {
- return "nodeID: " + nodeId;
+ return getName() + ": nodeID: " + nodeId;
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index fd6b690..2bed845 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.Arrays;
+
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -50,13 +52,16 @@
@Override
public void run() {
try {
- DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
- knownRecords);
- callback.setValue(partitionLocations);
+ dds.getResultPartitionLocations(jobId, rsId, knownRecords, callback);
} catch (HyracksDataException e) {
callback.setException(e);
}
}
});
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Known@" + Arrays.toString(knownRecords);
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
index 7aca3ff..adce1ef 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,6 @@
@Override
public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId;
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 46a7c16..340134e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -123,4 +123,10 @@
}
return jobLogObject;
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return getName() + ": JobId@" + jobId + " Status@" + status
+ + (exceptions == null ? "" : " Exceptions@" + exceptions);
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 92a0f5a..3e6be30 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,6 +57,6 @@
@Override
public String toString() {
- return "PartitionAvailable@" + partitionDescriptor;
+ return getName() + ": " + partitionDescriptor;
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index 3aa7e41..bb0d013 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,6 +54,6 @@
@Override
public String toString() {
- return "PartitionRequest@" + partitionRequest;
+ return getName() + ": " + partitionRequest;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 564d1d3..f87a7fa 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +29,8 @@
private final boolean orderedResult;
+ private final boolean emptyResult;
+
private final int partition;
private final int nPartitions;
@@ -36,11 +38,12 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
this.orderedResult = orderedResult;
+ this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -48,13 +51,13 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, networkAddress);
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+ partition, nPartitions, networkAddress);
}
@Override
public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
+ + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index 5ea171e..fe2bb3a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,6 +43,6 @@
@Override
public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..d8d00f5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,6 +57,6 @@
@Override
public String toString() {
- return "TaskCompleteEvent@[" + nodeId + "[" + jobId + ":" + taId + "]";
+ return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]";
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..d6938d9 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -42,6 +42,6 @@
@Override
public String toString() {
- return "TaskFailureEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+ return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "]";
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 926c83f..659dc23 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -56,7 +56,7 @@
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
int nPartitions, NetworkAddress networkAddress) throws Exception;
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 5a6d849..a5f0d8f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,12 +17,10 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
import edu.uci.ics.hyracks.api.job.JobId;
/**
@@ -64,9 +62,9 @@
private void sweep() {
synchronized (datasetManager) {
toBeCollected.clear();
- for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
- if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
- toBeCollected.add(entry.getKey());
+ for (JobId jobId : datasetManager.getJobIds()) {
+ if (System.currentTimeMillis() > datasetManager.getState(jobId).getTimestamp() + resultTTL) {
+ toBeCollected.add(jobId);
}
}
for (JobId jobId : toBeCollected) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9077cd8..6be2294 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -456,6 +456,8 @@
private final boolean orderedResult;
+ private final boolean emptyResult;
+
private final int partition;
private final int nPartitions;
@@ -463,10 +465,11 @@
private NetworkAddress networkAddress;
public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions, NetworkAddress networkAddress) {
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
this.jobId = jobId;
this.rsId = rsId;
this.orderedResult = orderedResult;
+ this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -489,6 +492,10 @@
return orderedResult;
}
+ public boolean getEmptyResult() {
+ return emptyResult;
+ }
+
public int getPartition() {
return partition;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b52d0ae..6bb1a93 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -112,10 +112,10 @@
ipcHandle.send(-1, fn, null);
}
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
int nPartitions, NetworkAddress networkAddress) throws Exception {
CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
- jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
+ jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
index adffba1..7203a36 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,4 +20,15 @@
public Level logLevel() {
return Level.INFO;
}
-}
\ No newline at end of file
+
+ public String getName() {
+ final String className = getClass().getName();
+ final int endIndex = className.endsWith("Work") ? className.length() - 4 : className.length();
+ return className.substring(className.lastIndexOf('.') + 1, endIndex);
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b1f4147 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -70,28 +71,22 @@
boolean asyncMode, int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
- try {
- synchronized (this) {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
- fileFactory);
+ synchronized (this) {
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
+ datasetMemoryManager, fileFactory);
- ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
- if (rsIdMap == null) {
- rsIdMap = new ResultSetMap();
- partitionResultStateMap.put(jobId, rsIdMap);
- }
-
- ResultState[] resultStates = rsIdMap.get(rsId);
- if (resultStates == null) {
- resultStates = new ResultState[nPartitions];
- rsIdMap.put(rsId, resultStates);
- }
- resultStates[partition] = dpw.getResultState();
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+ if (rsIdMap == null) {
+ rsIdMap = new ResultSetMap();
+ partitionResultStateMap.put(jobId, rsIdMap);
}
- } catch (Exception e) {
- throw new HyracksException(e);
+
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates == null) {
+ resultStates = new ResultState[nPartitions];
+ rsIdMap.put(rsId, resultStates);
+ }
+ resultStates[partition] = dpw.getResultState();
}
LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
@@ -99,6 +94,17 @@
}
@Override
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+ boolean orderedResult, boolean emptyResult) throws HyracksException {
+ try {
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+ partition, nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
@@ -211,8 +217,13 @@
}
@Override
- public Map<JobId, IDatasetStateRecord> getStateMap() {
- return partitionResultStateMap;
+ public Set<JobId> getJobIds() {
+ return partitionResultStateMap.keySet();
+ }
+
+ @Override
+ public IDatasetStateRecord getState(JobId jobId) {
+ return partitionResultStateMap.get(jobId);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index d61da67..d3a6fb5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -39,22 +39,30 @@
private final boolean asyncMode;
+ private final boolean orderedResult;
+
private final int partition;
+ private final int nPartitions;
+
private final DatasetMemoryManager datasetMemoryManager;
private final ResultSetPartitionId resultSetPartitionId;
private final ResultState resultState;
+ private boolean partitionRegistered;
+
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, boolean asyncMode, int partition, DatasetMemoryManager datasetMemoryManager,
- IWorkspaceFileFactory fileFactory) {
+ ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
+ DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
this.asyncMode = asyncMode;
+ this.orderedResult = orderedResult;
this.partition = partition;
+ this.nPartitions = nPartitions;
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
@@ -71,11 +79,16 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + partition + ")");
}
+ partitionRegistered = false;
resultState.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (!partitionRegistered) {
+ registerResultPartitionLocation(false);
+ partitionRegistered = true;
+ }
if (datasetMemoryManager == null) {
resultState.write(buffer);
} else {
@@ -99,12 +112,28 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + partition + ")");
}
-
+ if (!partitionRegistered) {
+ registerResultPartitionLocation(true);
+ partitionRegistered = true;
+ }
+ resultState.close();
try {
- resultState.close();
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
}
+
+ void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
+ try {
+ manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, empty);
+ } catch (HyracksException e) {
+ if (e instanceof HyracksDataException) {
+ throw (HyracksDataException) e;
+ } else {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
index 41069d8..d08cd0a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,6 @@
@Override
public String toString() {
- return "nodeID: " + nodeId;
+ return getName() + ": nodeID: " + nodeId;
}
}
\ No newline at end of file