Merge branch 'fullstack_lsm_staging' into madhusudancs/parallelize_result_distribution.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f53f9b..2f7c0ed 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -21,9 +21,11 @@
// TODO type safe list of expressions
private List<Mutable<ILogicalExpression>> mergeExpressions;
private LogicalVariable partitioningVariable;
+ private boolean global;
public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
super(variables, expressions);
+ global = false;
}
@Override
@@ -77,6 +79,14 @@
return partitioningVariable;
}
+ public void setGlobal() {
+ global = true;
+ }
+
+ public boolean isGlobal() {
+ return global;
+ }
+
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 81dc2c2..5d490e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -70,9 +71,14 @@
AggregateOperator aggOp = (AggregateOperator) op;
if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
- partitioningVariables.add(aggOp.getPartitioningVariable());
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+ if (aggOp.isGlobal()) {
+ pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
+ } else {
+ Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
+ partitioningVariables.add(aggOp.getPartitioningVariable());
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null),
+ null);
+ }
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 302d4d2..9a96319 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -100,10 +100,9 @@
context, columns);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
- resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+ resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
- builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
ILogicalOperator src = resultOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, resultOp, 0);
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
similarity index 68%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
rename to algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
index 8f5ed64..ca15346 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
@@ -3,19 +3,25 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.algebricks.core.algebra.properties;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+public class ResultSetDomain implements INodeDomain {
+ @Override
+ public boolean sameAs(INodeDomain domain) {
+ return true;
+ }
-public interface IDatasetPartitionReader {
- public void writeTo(IFrameWriter writer);
+ @Override
+ public Integer cardinality() {
+ return 0;
+ }
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 08271c1..30ab542 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -120,6 +120,7 @@
context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
initAgg.setPartitioningVariable(trueVar);
initAgg.getInputs().get(0).setValue(trueAssignOp);
+ initAgg.setGlobal();
}
return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
} else {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..0c5a5bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -31,6 +31,8 @@
public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+ public void abortReader(JobId jobId);
+
public IWorkspaceFileFactory getFileFactory();
public void close();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
- public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index d49d5cd..f29356b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IHyracksDatasetDirectoryServiceConnection {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index ba21a84..3fe4ada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataset;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IHyracksDatasetDirectoryServiceInterface {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
index b928a49..c397a94 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IHyracksDatasetReader {
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 095fd7d..c882448 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -17,7 +17,7 @@
import java.net.InetSocketAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 47cdf97..9c3b918 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -16,7 +16,7 @@
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 6419983..08ad0d7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
@@ -118,7 +118,7 @@
while (readSize <= 0 && !(isLastPartitionReadComplete())) {
synchronized (lastMonitor) {
- while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+ while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
try {
lastMonitor.wait();
} catch (InterruptedException e) {
@@ -127,6 +127,9 @@
}
}
+ if (lastMonitor.failed()) {
+ throw new HyracksDataException("Job Failed.");
+ }
if (isPartitionReadComplete(lastMonitor)) {
knownRecords[lastReadPartition].readEOS();
if ((lastReadPartition == knownRecords.length - 1)) {
@@ -135,12 +138,8 @@
try {
lastReadPartition++;
while (knownRecords[lastReadPartition] == null) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
- resultSetId, knownRecords);
- } catch (Exception e) {
- // Do nothing here.
- }
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
+ resultSetId, knownRecords);
}
resultChannel = new DatasetNetworkInputChannel(netManager,
@@ -149,9 +148,7 @@
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e648733..21b05d4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -99,7 +99,7 @@
records[partition].writeEOS();
for (DatasetDirectoryRecord record : records) {
- if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+ if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
successCount++;
}
}
@@ -112,14 +112,18 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
- djr.fail();
+ if (djr != null) {
+ djr.fail();
+ }
notifyAll();
}
@Override
public synchronized void reportJobFailure(JobId jobId) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
- djr.fail();
+ if (djr != null) {
+ djr.fail();
+ }
notifyAll();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index cecd677..4e27f12 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,12 +21,13 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class DatasetMemoryManager {
+ private int availableMemory;
+
private final Set<Page> availPages;
private final LeastRecentlyUsedList leastRecentlyUsedList;
@@ -36,29 +37,32 @@
private final static int FRAME_SIZE = 32768;
public DatasetMemoryManager(int availableMemory) {
+ this.availableMemory = availableMemory;
+
availPages = new HashSet<Page>();
// Atleast have one page for temporarily storing the results.
- if (availableMemory <= 0)
- availableMemory = FRAME_SIZE;
-
- while (availableMemory >= FRAME_SIZE) {
- /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
- * instead of direct ByteBuffer.allocate()?
- */
- availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
- availableMemory -= FRAME_SIZE;
- }
+ if (this.availableMemory <= FRAME_SIZE)
+ this.availableMemory = FRAME_SIZE;
leastRecentlyUsedList = new LeastRecentlyUsedList();
resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
}
- public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
- throws OutOfMemoryError, HyracksDataException {
+ public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+ throws HyracksDataException {
Page page;
if (availPages.isEmpty()) {
- page = evictPage();
+ if (availableMemory >= FRAME_SIZE) {
+ /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+ * instead of direct ByteBuffer.allocate()?
+ */
+ availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+ availableMemory -= FRAME_SIZE;
+ page = getAvailablePage();
+ } else {
+ page = evictPage();
+ }
} else {
page = getAvailablePage();
}
@@ -71,7 +75,7 @@
* update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
* then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
*/
- PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+ PartitionNode pn = updateReference(resultSetPartitionId, resultState);
pn.add(page);
return page;
}
@@ -81,7 +85,7 @@
updateReference(resultSetPartitionId, null);
}
- public int getPageSize() {
+ public static int getPageSize() {
return FRAME_SIZE;
}
@@ -90,28 +94,29 @@
resultPartitionNodesMap.put(resultSetPartitionId, pn);
}
- protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
- IDatasetPartitionWriter dpw) {
+ protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
PartitionNode pn = null;
if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
- if (dpw != null) {
- pn = new PartitionNode(resultSetPartitionId, dpw);
+ if (resultState != null) {
+ pn = new PartitionNode(resultSetPartitionId, resultState);
insertPartitionNode(resultSetPartitionId, pn);
}
return pn;
}
- pn = resultPartitionNodesMap.get(resultSetPartitionId);
- leastRecentlyUsedList.remove(pn);
- insertPartitionNode(resultSetPartitionId, pn);
+ synchronized (this) {
+ pn = resultPartitionNodesMap.get(resultSetPartitionId);
+ leastRecentlyUsedList.remove(pn);
+ insertPartitionNode(resultSetPartitionId, pn);
+ }
return pn;
}
- protected synchronized Page evictPage() throws HyracksDataException {
+ protected Page evictPage() throws HyracksDataException {
PartitionNode pn = leastRecentlyUsedList.getFirst();
- IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
- Page page = dpw.returnPage();
+ ResultState resultState = pn.getResultState();
+ Page page = resultState.returnPage();
/* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
* away all the pages allocated to it and add to the available pages set.
@@ -140,7 +145,7 @@
return page;
}
- protected synchronized Page getAvailablePage() {
+ protected Page getAvailablePage() {
Iterator<Page> iter = availPages.iterator();
Page page = iter.next();
iter.remove();
@@ -197,15 +202,15 @@
private final ResultSetPartitionId resultSetPartitionId;
- private final IDatasetPartitionWriter datasetPartitionWriter;
+ private final ResultState resultState;
private PartitionNode prev;
private PartitionNode next;
- public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+ public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
this.resultSetPartitionId = resultSetPartitionId;
- this.datasetPartitionWriter = datasetPartitionWriter;
+ this.resultState = resultState;
prev = null;
next = null;
}
@@ -214,8 +219,8 @@
return resultSetPartitionId;
}
- public IDatasetPartitionWriter getDatasetPartitionWriter() {
- return datasetPartitionWriter;
+ public ResultState getResultState() {
+ return resultState;
}
public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index af9a607..ef2902e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,11 +17,11 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -32,6 +32,8 @@
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class DatasetPartitionManager implements IDatasetPartitionManager {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
private final NodeControllerService ncs;
private final Executor executor;
@@ -50,18 +52,29 @@
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ if (availableMemory >= DatasetMemoryManager.getPageSize()) {
+ datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ } else {
+ datasetMemoryManager = null;
+ }
partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
private static final long serialVersionUID = 1L;
protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
- if (size() > resultHistorySize) {
- for (ResultState state : eldest.getValue()) {
- state.deinit();
+ synchronized (DatasetPartitionManager.this) {
+ if (size() > resultHistorySize) {
+ ResultState[] resultStates = eldest.getValue();
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+ }
+ }
+ return true;
}
- return true;
+ return false;
}
- return false;
}
};
}
@@ -72,10 +85,10 @@
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- synchronized (partitionResultStateMap) {
+ synchronized (this) {
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
ResultState[] resultStates = partitionResultStateMap.get(jobId);
if (resultStates == null) {
@@ -88,12 +101,15 @@
throw new HyracksException(e);
}
+ LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
return dpw;
}
@Override
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
+ LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+ + ":partition: " + partition);
ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (Exception e) {
throw new HyracksException(e);
@@ -103,6 +119,8 @@
@Override
public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
+ LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+ + partition);
ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
} catch (Exception e) {
throw new HyracksException(e);
@@ -113,7 +131,7 @@
public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
throws HyracksException {
ResultState resultState;
- synchronized (partitionResultStateMap) {
+ synchronized (this) {
ResultState[] resultStates = partitionResultStateMap.get(jobId);
if (resultStates == null) {
@@ -126,8 +144,23 @@
}
}
- IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
+ LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
+ }
+
+ @Override
+ public synchronized void abortReader(JobId jobId) {
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+ if (resultStates == null) {
+ return;
+ }
+ for (ResultState state : resultStates) {
+ if (state != null) {
+ state.abort();
+ }
+ }
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index a584b4b..07624de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
this.datasetMemoryManager = datasetMemoryManager;
this.executor = executor;
this.resultState = resultState;
}
- private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- long readSize = 0;
- synchronized (resultState) {
- while (offset >= resultState.getSize() && !resultState.getEOS()) {
- try {
- resultState.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (offset >= resultState.getSize() && resultState.getEOS()) {
- return readSize;
- }
-
- if (offset < resultState.getPersistentSize()) {
- readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
- }
-
- if (readSize < buffer.capacity()) {
- long localPageOffset = offset - resultState.getPersistentSize();
- int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
- Page page = resultState.getPage(localPageIndex);
- if (page == null) {
- return readSize;
- }
- readSize += buffer.remaining();
- buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
- }
-
- datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
- return readSize;
- }
-
- @Override
public void writeTo(final IFrameWriter writer) {
executor.execute(new Runnable() {
@Override
@@ -88,8 +45,7 @@
NetworkOutputChannel channel = (NetworkOutputChannel) writer;
channel.setFrameSize(resultState.getFrameSize());
try {
- fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
- IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ resultState.readOpen();
channel.open();
try {
long offset = 0;
@@ -109,10 +65,8 @@
}
} finally {
channel.close();
- resultState.getIOManager().close(fileHandle);
+ resultState.readClose();
}
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
} catch (HyracksDataException e) {
throw new RuntimeException(e);
}
@@ -120,6 +74,14 @@
LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
}
}
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ if (datasetMemoryManager == null) {
+ return resultState.read(offset, buffer);
+ } else {
+ return resultState.read(datasetMemoryManager, offset, buffer);
+ }
+ }
});
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..8f4b639 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,24 +18,19 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
- private static final String FILE_PREFIX = "result_";
-
private final IDatasetPartitionManager manager;
private final JobId jobId;
@@ -50,10 +45,9 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+ ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+ IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
@@ -61,7 +55,7 @@
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
- resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+ resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
}
public ResultState getResultState() {
@@ -69,41 +63,27 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + partition + ")");
}
- String fName = FILE_PREFIX + String.valueOf(partition);
- FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
- fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- resultState.init(fRef, fileHandle);
+ resultState.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- int srcOffset = 0;
- Page destPage = resultState.getLastPage();
-
- while (srcOffset < buffer.limit()) {
- if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
- destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
- resultState.addPage(destPage);
- }
- int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
- destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
- srcOffset += srcLength;
- resultState.incrementSize(srcLength);
- }
-
- synchronized (resultState) {
- resultState.notifyAll();
+ if (datasetMemoryManager == null) {
+ resultState.write(buffer);
+ } else {
+ resultState.write(datasetMemoryManager, buffer);
}
}
@Override
public void fail() throws HyracksDataException {
try {
+ resultState.closeAndDelete();
+ resultState.abort();
manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
@@ -117,32 +97,10 @@
}
try {
- synchronized (resultState) {
- resultState.setEOS(true);
- resultState.notifyAll();
- }
+ resultState.close();
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
}
-
- @Override
- public Page returnPage() throws HyracksDataException {
- Page page = resultState.removePage(0);
-
- IIOManager ioManager = resultState.getIOManager();
-
- // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
- if (page == null) {
- ioManager.close(fileHandle);
- return null;
- }
-
- page.getBuffer().flip();
-
- long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
- resultState.incrementPersistentSize(delta);
- return page;
- }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..911f372 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,28 +17,35 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class ResultState implements IStateObject {
+ private static final String FILE_PREFIX = "result_";
+
private final ResultSetPartitionId resultSetPartitionId;
private final int frameSize;
private final IIOManager ioManager;
+ private final IWorkspaceFileFactory fileFactory;
+
private final AtomicBoolean eos;
- private final AtomicBoolean readEOS;
+ private final AtomicBoolean failed;
private final List<Page> localPageList;
@@ -46,29 +53,40 @@
private IFileHandle writeFileHandle;
+ private IFileHandle readFileHandle;
+
private long size;
private long persistentSize;
- ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+ ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+ int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
this.ioManager = ioManager;
+ this.fileFactory = fileFactory;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
- readEOS = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
+
+ fileRef = null;
+ writeFileHandle = null;
}
- public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
- this.fileRef = fileRef;
- this.writeFileHandle = writeFileHandle;
-
+ public synchronized void open() {
size = 0;
persistentSize = 0;
+ }
+
+ public synchronized void close() {
+ eos.set(true);
notifyAll();
}
- public synchronized void deinit() {
+ public synchronized void closeAndDelete() {
+ // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+ // to be taken when there are more requests to these result states.
+ failed.set(true);
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
@@ -76,7 +94,149 @@
// Since file handle could not be closed, just ignore.
}
}
- fileRef.delete();
+ if (fileRef != null) {
+ fileRef.delete();
+ }
+ }
+
+ public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+
+ size += ioManager.syncWrite(writeFileHandle, size, buffer);
+
+ notifyAll();
+ }
+
+ public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+ throws HyracksDataException {
+ int srcOffset = 0;
+ Page destPage = null;
+
+ if (!localPageList.isEmpty()) {
+ destPage = localPageList.get(localPageList.size() - 1);
+ }
+
+ while (srcOffset < buffer.limit()) {
+ if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+ destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ localPageList.add(destPage);
+ }
+ int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+ destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ srcOffset += srcLength;
+ size += srcLength;
+ }
+
+ notifyAll();
+ }
+
+ public synchronized void readOpen() {
+ // It is a noOp for now, leaving here to keep the API stable for future usage.
+ }
+
+ public synchronized void readClose() throws HyracksDataException {
+ if (readFileHandle != null) {
+ ioManager.close(readFileHandle);
+ }
+ }
+
+ public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ long readSize = 0;
+
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+
+ return readSize;
+ }
+
+ public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ throws HyracksDataException {
+ long readSize = 0;
+ synchronized (this) {
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (offset < persistentSize) {
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - persistentSize;
+ int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+ Page page = getPage(localPageIndex);
+ if (page == null) {
+ return readSize;
+ }
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+ }
+ }
+ datasetMemoryManager.pageReferenced(resultSetPartitionId);
+ return readSize;
+ }
+
+ public synchronized void abort() {
+ failed.set(true);
+ notifyAll();
+ }
+
+ public synchronized Page returnPage() throws HyracksDataException {
+ Page page = removePage();
+
+ // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+ if (page == null) {
+ ioManager.close(writeFileHandle);
+ return null;
+ }
+
+ page.getBuffer().flip();
+
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ notifyAll();
+ }
+
+ long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+ persistentSize += delta;
+ return page;
+ }
+
+ public synchronized void setEOS(boolean eos) {
+ this.eos.set(eos);
}
public ResultSetPartitionId getResultSetPartitionId() {
@@ -91,76 +251,6 @@
return ioManager;
}
- public synchronized void incrementSize(long delta) {
- size += delta;
- }
-
- public synchronized long getSize() {
- return size;
- }
-
- public synchronized void incrementPersistentSize(long delta) {
- persistentSize += delta;
- }
-
- public synchronized long getPersistentSize() {
- return persistentSize;
- }
-
- public void setEOS(boolean eos) {
- this.eos.set(eos);
- }
-
- public boolean getEOS() {
- return eos.get();
- }
-
- public boolean getReadEOS() {
- return readEOS.get();
- }
-
- public synchronized void addPage(Page page) {
- localPageList.add(page);
- }
-
- public synchronized Page removePage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.remove(index);
- }
- return page;
- }
-
- public synchronized Page getPage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(index);
- }
- return page;
- }
-
- public synchronized Page getLastPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(localPageList.size() - 1);
- }
- return page;
- }
-
- public synchronized Page getFirstPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(0);
- }
- return page;
- }
-
- public synchronized FileReference getValidFileReference() throws InterruptedException {
- while (fileRef == null)
- wait();
- return fileRef;
- }
-
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
@@ -185,4 +275,36 @@
public void fromBytes(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
+
+ private Page getPage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(index);
+ }
+ return page;
+ }
+
+ private Page removePage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(localPageList.size() - 1);
+ }
+ return page;
+ }
+
+ private void initReadFileHandle() throws HyracksDataException {
+ while (fileRef == null && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (failed.get()) {
+ return;
+ }
+
+ readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..a078f50 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -46,6 +46,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
+ ncs.getDatasetPartitionManager().abortReader(jobId);
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..8b9d15a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +33,9 @@
@Override
public void run() {
try {
- ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
- ncs.getId(), details);
+ JobId jobId = task.getJobletContext().getJobId();
+ ncs.getDatasetPartitionManager().abortReader(jobId);
+ ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 07f6ba2..c403501 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -32,6 +32,9 @@
}
public void reset(ByteBuffer buffer, boolean clear) {
+ if (clear) {
+ buffer.clear();
+ }
frameTupleAppender.reset(buffer, clear);
}