Added connector policy support to runtime
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@399 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 0c515f4..bf4dde1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -20,8 +20,10 @@
import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public class ActivityCluster {
private final JobRun jobRun;
@@ -38,6 +40,8 @@
private IActivityClusterStateMachine acsm;
+ private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
public ActivityCluster(JobRun jobRun, Set<ActivityNodeId> activities) {
this.jobRun = jobRun;
this.activities = activities;
@@ -62,7 +66,7 @@
return dependencies;
}
- public Map<ActivityNodeId, Task[]> getTaskStateMap() {
+ public Map<ActivityNodeId, Task[]> getTaskMap() {
return taskStateMap;
}
@@ -97,4 +101,12 @@
public void notifyActivityClusterComplete() throws HyracksException {
jobRun.getStateMachine().notifyActivityClusterComplete(this);
}
+
+ public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
+ return connectorPolicies;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index a69b144..1fb4bc3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -51,7 +51,7 @@
Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
if (ac != null) {
- Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+ Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
Task[] taskStates = taskStateMap.get(tid.getActivityId());
if (taskStates != null && taskStates.length > tid.getPartition()) {
Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index cdd6a65..c325433 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -45,6 +46,7 @@
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -223,6 +225,7 @@
final UUID jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
String nodeId = e.getKey();
final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
@@ -234,7 +237,7 @@
public void run() {
try {
node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors);
+ taskDescriptors, connectorPolicies);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 2ea10d6..5737743 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -52,6 +52,7 @@
import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
public class DefaultJobRunStateMachine implements IJobRunStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -327,7 +328,8 @@
private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
- Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+
+ Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
for (ActivityNodeId anId : ac.getActivities()) {
ActivityPartitionDetails apd = pcMap.get(anId);
@@ -337,11 +339,11 @@
}
taskStateMap.put(anId, taskStates);
}
-
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
+ ac.setConnectorPolicyMap(connectorPolicies);
+
Set<ActivityNodeId> activities = ac.getActivities();
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
for (ActivityNodeId anId : activities) {
Task[] taskStates = taskStateMap.get(anId);
@@ -364,7 +366,7 @@
ConnectorDescriptorId cdId = c.getConnectorId();
IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
if (cPolicy == null) {
- cPolicy = new PipelinedConnectorPolicy();
+ cPolicy = new SendSideMaterializedConnectorPolicy();
}
ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
Task[] ac2TaskStates = taskStateMap.get(ac2);
@@ -463,6 +465,41 @@
}
}
+ private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
+ Map<ActivityNodeId, ActivityPartitionDetails> pcMap) {
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
+ Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ Set<ActivityNodeId> activities = ac.getActivities();
+ Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+ BitSet targetBitmap = new BitSet();
+ for (ActivityNodeId ac1 : activities) {
+ Task[] ac1TaskStates = taskStateMap.get(ac1);
+ int nProducers = ac1TaskStates.length;
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ if (outputConns != null) {
+ for (IConnectorDescriptor c : outputConns) {
+ ConnectorDescriptorId cdId = c.getConnectorId();
+ ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = taskStateMap.get(ac2);
+ int nConsumers = ac2TaskStates.length;
+
+ int[] fanouts = new int[nProducers];
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ fanouts[i] = targetBitmap.cardinality();
+ }
+ IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+ cPolicyMap.put(cdId, cp);
+ }
+ }
+ }
+ return cPolicyMap;
+ }
+
+ private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+ return new PipelinedConnectorPolicy();
+ }
+
private void computeDependencyClosure(Set<TaskCluster> tcSet) {
boolean done = false;
while (!done) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 4043baa..54b31f8 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -16,12 +16,15 @@
import java.rmi.Remote;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public interface INodeController extends Remote {
public String getId() throws Exception;
@@ -30,8 +33,8 @@
public NodeCapability getNodeCapability() throws Exception;
- public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors)
- throws Exception;
+ public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
new file mode 100644
index 0000000..235cd3c
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.dataflow;
+
+public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return false;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2f28b92..c7636f8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -52,6 +52,7 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -75,6 +76,9 @@
import edu.uci.ics.hyracks.control.common.base.NodeParameters;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -82,6 +86,7 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -137,6 +142,10 @@
applications = new Hashtable<String, NCApplicationContext>();
}
+ public IHyracksRootContext getRootContext() {
+ return ctx;
+ }
+
private static List<IODeviceHandle> getDevices(String ioDevices) {
List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
StringTokenizer tok = new StringTokenizer(ioDevices, ",");
@@ -169,6 +178,7 @@
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ partitionManager.close();
connectionManager.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -218,8 +228,10 @@
return InetAddress.getByAddress(ipBytes);
}
+ @Override
public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
- List<TaskAttemptDescriptor> taskDescriptors) throws Exception {
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
try {
NCApplicationContext appCtx = applications.get(appName);
final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
@@ -273,13 +285,11 @@
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- IPartitionWriterFactory pwFactory = new IPartitionWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(partitionManager, new PartitionId(jobId,
- conn.getConnectorId(), partition, receiverIndex));
- }
- };
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+ IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
+ partition);
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
}
@@ -300,6 +310,28 @@
}
}
+ private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
+ final IConnectorDescriptor conn, final int senderIndex) {
+ if (cPolicy instanceof PipelinedConnectorPolicy) {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
+ senderIndex, receiverIndex));
+ }
+ };
+ } else if (cPolicy instanceof SendSideMaterializedConnectorPolicy) {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), executor);
+ }
+ };
+ }
+ throw new IllegalArgumentException("Unknown connector policy: " + cPolicy);
+ }
+
private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, INCApplicationContext appCtx) throws Exception {
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index 6833cbe..97f82d9 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -56,23 +56,28 @@
try {
FileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- writer.open();
try {
- long offset = 0;
- ByteBuffer buffer = ctx.allocateFrame();
- while (true) {
- buffer.clear();
- long size = ioManager.syncRead(fh, offset, buffer);
- if (size < 0) {
- break;
- } else if (size < buffer.capacity()) {
- throw new HyracksDataException("Premature end of file");
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = ioManager.syncRead(fh, offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ writer.nextFrame(buffer);
}
- buffer.flip();
- writer.nextFrame(buffer);
+ } finally {
+ writer.close();
}
} finally {
- writer.close();
+ ioManager.close(fh);
}
} catch (HyracksDataException e) {
throw new RuntimeException(e);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
new file mode 100644
index 0000000..a74f099
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartitionWriter implements IFrameWriter {
+ protected final IHyracksRootContext ctx;
+
+ protected final PartitionManager manager;
+
+ protected final PartitionId pid;
+
+ protected final Executor executor;
+
+ private FileReference fRef;
+
+ private FileHandle handle;
+
+ private long size;
+
+ public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.pid = pid;
+ this.executor = executor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ctx.getIOManager().close(handle);
+ manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 27afb8c..4adb20a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -23,9 +23,13 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager implements IPartitionRequestListener {
private final NetworkAddress dataPort;
@@ -34,10 +38,16 @@
private final Map<PartitionId, IPartition> partitionMap;
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
public PartitionManager(NodeControllerService ncs, NetworkAddress dataPort) {
this.dataPort = dataPort;
this.ncs = ncs;
partitionMap = new HashMap<PartitionId, IPartition>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
}
public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
@@ -79,4 +89,12 @@
throw new HyracksException("Request for unknown partition " + partitionId);
}
}
+
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ public void close() {
+ deallocatableRegistry.close();
+ }
}
\ No newline at end of file