Added ability to register TaskState with the infrastructure
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@565 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
new file mode 100644
index 0000000..c24e488
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface ITaskState {
+ public JobId getJobId();
+
+ public TaskId getTaskId();
+
+ public long getMemoryOccupancy();
+
+ public void toBytes(DataOutput out) throws IOException;
+
+ public void fromBytes(DataInput in) throws IOException;
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
index b41cb35..ecc44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.hyracks.api.job;
-public interface IOperatorEnvironment {
- public void set(String name, Object value);
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
- public Object get(String name);
+public interface IOperatorEnvironment {
+ public void setTaskState(ITaskState taskState);
+
+ public ITaskState getTaskState(TaskId taskId);
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index a3ca82d..e5d9f64 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -275,7 +275,7 @@
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
- return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+ return new PipeliningConnectorPolicy();
}
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 3026359..2176dda 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -61,6 +63,8 @@
private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
+ private final Map<TaskId, ITaskState> taskStateMap;
+
private final Map<TaskAttemptId, Task> taskMap;
private final Map<String, Counter> counterMap;
@@ -77,6 +81,7 @@
this.jobId = jobId;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ taskStateMap = new HashMap<TaskId, ITaskState>();
taskMap = new HashMap<TaskAttemptId, Task>();
counterMap = new HashMap<String, Counter>();
localVariableMap = new HashMap<MultipartName, Object>();
@@ -119,28 +124,26 @@
localVariableMap.put(name, value);
}
- private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+ private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
private final String nodeId;
- private final Map<String, Object> map;
public OperatorEnvironmentImpl(String nodeId) {
this.nodeId = nodeId;
- map = new HashMap<String, Object>();
- }
-
- @Override
- public Object get(String name) {
- return map.get(name);
- }
-
- @Override
- public void set(String name, Object value) {
- map.put(name, value);
}
public String toString() {
return super.toString() + "@" + nodeId;
}
+
+ @Override
+ public void setTaskState(ITaskState taskState) {
+ taskStateMap.put(taskState.getTaskId(), taskState);
+ }
+
+ @Override
+ public ITaskState getTaskState(TaskId taskId) {
+ return taskStateMap.get(taskId);
+ }
}
public Executor getExecutor() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
new file mode 100644
index 0000000..ba4dfd1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public abstract class AbstractTaskState implements ITaskState {
+ protected JobId jobId;
+
+ protected TaskId taId;
+
+ protected long memoryOccupancy;
+
+ protected AbstractTaskState() {
+ }
+
+ protected AbstractTaskState(JobId jobId, TaskId taId) {
+ this.jobId = jobId;
+ this.taId = taId;
+ }
+
+ @Override
+ public final JobId getJobId() {
+ return jobId;
+ }
+
+ public final void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public final TaskId getTaskId() {
+ return taId;
+ }
+
+ public final void setTaskId(TaskId tId) {
+ this.taId = tId;
+ }
+
+ @Override
+ public final long getMemoryOccupancy() {
+ return memoryOccupancy;
+ }
+
+ public void setMemoryOccupancy(long memoryOccupancy) {
+ this.memoryOccupancy = memoryOccupancy;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 692a55a..5f78421 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -27,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -35,6 +38,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -46,21 +50,18 @@
import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+ private static final int MERGE_ACTIVITY_ID = 1;
+
private static final long serialVersionUID = 1L;
- /**
- * The input frame identifier (in the job environment)
- */
- private static final String GROUPTABLES = "gtables";
- /**
- * The runs files identifier (in the job environment)
- */
- private static final String RUNS = "runs";
private final int[] keyFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory firstNormalizerFactory;
@@ -102,8 +103,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), 0));
- MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, 1));
+ AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
builder.addActivity(aggregateAct);
builder.addSourceEdge(0, aggregateAct, 0);
@@ -114,6 +115,29 @@
builder.addBlockingEdge(aggregateAct, mergeAct);
}
+ public static class AggregateActivityState extends AbstractTaskState {
+ private LinkedList<RunFileReader> runs;
+
+ private ISpillableTable gTable;
+
+ public AggregateActivityState() {
+ }
+
+ private AggregateActivityState(JobId jobId, TaskId tId) {
+ super(jobId, tId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private class AggregateActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -123,25 +147,23 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- final ISpillableTable gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields,
- comparatorFactories, firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
- ExternalGroupOperatorDescriptor.this.framesLimit);
final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-
- /**
- * Run files
- */
- private LinkedList<RunFileReader> runs;
+ private AggregateActivityState state;
@Override
public void open() throws HyracksDataException {
- runs = new LinkedList<RunFileReader>();
- gTable.reset();
+ state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.runs = new LinkedList<RunFileReader>();
+ state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+ firstNormalizerFactory, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ ExternalGroupOperatorDescriptor.this.framesLimit);
+ state.gTable.reset();
}
@Override
@@ -153,9 +175,9 @@
* If the group table is too large, flush the table into
* a run file.
*/
- if (!gTable.insert(accessor, i)) {
+ if (!state.gTable.insert(accessor, i)) {
flushFramesToRun();
- if (!gTable.insert(accessor, i))
+ if (!state.gTable.insert(accessor, i))
throw new HyracksDataException(
"Failed to insert a new buffer into the aggregate operator!");
}
@@ -169,21 +191,17 @@
@Override
public void close() throws HyracksDataException {
- if (gTable.getFrameCount() >= 0) {
- if (runs.size() <= 0) {
- /**
- * All in memory
- */
- env.set(GROUPTABLES, gTable);
- } else {
+ if (state.gTable.getFrameCount() >= 0) {
+ if (state.runs.size() > 0) {
/**
* flush the memory into the run file.
*/
flushFramesToRun();
- gTable.close();
+ state.gTable.close();
+ state.gTable = null;
}
}
- env.set(RUNS, runs);
+ env.setTaskState(state);
}
private void flushFramesToRun() throws HyracksDataException {
@@ -197,15 +215,15 @@
RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
writer.open();
try {
- gTable.sortFrames();
- gTable.flushFrames(writer, true);
+ state.gTable.sortFrames();
+ state.gTable.flushFrames(writer, true);
} catch (Exception ex) {
throw new HyracksDataException(ex);
} finally {
writer.close();
}
- gTable.reset();
- runs.add(((RunFileWriter) writer).createReader());
+ state.gTable.reset();
+ state.runs.add(((RunFileWriter) writer).createReader());
}
};
return op;
@@ -221,7 +239,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -252,11 +270,10 @@
*/
private ByteBuffer outFrame, writerFrame;
- /**
- * List of the run files to be merged
- */
private LinkedList<RunFileReader> runs;
+ private AggregateActivityState aggState;
+
/**
* how many frames to be read ahead once
*/
@@ -270,21 +287,21 @@
private ArrayTupleBuilder finalTupleBuilder;
private FrameTupleAppender writerFrameAppender;
- @SuppressWarnings("unchecked")
public void initialize() throws HyracksDataException {
- runs = (LinkedList<RunFileReader>) env.get(RUNS);
+ aggState = (AggregateActivityState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ AGGREGATE_ACTIVITY_ID), partition));
+ runs = aggState.runs;
writer.open();
try {
if (runs.size() <= 0) {
- ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
+ ISpillableTable gTable = aggState.gTable;
if (gTable != null) {
if (isOutputSorted)
gTable.sortFrames();
gTable.flushFrames(writer, false);
}
- env.set(GROUPTABLES, null);
} else {
- long start = System.currentTimeMillis();
+ runs = new LinkedList<RunFileReader>(runs);
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
outFrameAppender.reset(outFrame, true);
@@ -297,8 +314,6 @@
}
}
inFrames.clear();
- long end = System.currentTimeMillis();
- System.out.println("merge time " + (end - start));
}
} catch (Exception e) {
writer.fail();
@@ -306,7 +321,6 @@
} finally {
writer.close();
}
- env.set(RUNS, null);
}
private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index a01c80e..625ee25 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -14,27 +14,35 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String HASHTABLE = "hashtable";
+ private static final int HASH_BUILD_ACTIVITY_ID = 0;
+
+ private static final int OUTPUT_ACTIVITY_ID = 1;
private static final long serialVersionUID = 1L;
@@ -58,10 +66,10 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, 0));
+ HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
builder.addActivity(ha);
- OutputActivity oa = new OutputActivity(new ActivityId(odId, 1));
+ OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
builder.addActivity(oa);
builder.addSourceEdge(0, ha, 0);
@@ -69,6 +77,27 @@
builder.addBlockingEdge(ha, oa);
}
+ public static class HashBuildActivityState extends AbstractTaskState {
+ private GroupingHashTable table;
+
+ public HashBuildActivityState() {
+ }
+
+ private HashBuildActivityState(JobId jobId, TaskId tId) {
+ super(jobId, tId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private class HashBuildActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -78,15 +107,17 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private GroupingHashTable table;
+ private HashBuildActivityState state;
@Override
public void open() throws HyracksDataException {
- table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+ state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
tableSize);
}
@@ -96,13 +127,13 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
- table.insert(accessor, i);
+ state.table.insert(accessor, i);
}
}
@Override
public void close() throws HyracksDataException {
- env.set(HASHTABLE, table);
+ env.setTaskState(state);
}
@Override
@@ -121,11 +152,13 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
+ HashBuildActivityState buildState = (HashBuildActivityState) env.getTaskState(new TaskId(
+ new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
+ GroupingHashTable table = buildState.table;
writer.open();
try {
table.write(writer);
@@ -135,7 +168,6 @@
} finally {
writer.close();
}
- env.set(HASHTABLE, null);
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index d530560..7f33bc4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -31,6 +35,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,12 +47,14 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String RELATION0 = "Rel0";
- private static final String RELATION1 = "Rel1";
+ private static final int RPARTITION_ACTIVITY_ID = 0;
+ private static final int SPARTITION_ACTIVITY_ID = 1;
+ private static final int JOIN_ACTIVITY_ID = 2;
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -98,9 +105,11 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), RELATION0, keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), RELATION1, keys1, 1);
- JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
+ HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, RPARTITION_ACTIVITY_ID),
+ keys0, 0);
+ HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, SPARTITION_ACTIVITY_ID),
+ keys1, 1);
+ JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID));
builder.addActivity(rpart);
builder.addSourceEdge(0, rpart, 0);
@@ -119,26 +128,43 @@
return memsize;
}
+ public static class HashPartitionTaskState extends AbstractTaskState {
+ private RunFileWriter[] fWriters;
+
+ public HashPartitionTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private class HashPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String partitionsKey;
private int operatorInputIndex;
private int keys[];
- public HashPartitionActivityNode(ActivityId id, String partitionsKey, int keys[], int operatorInputIndex) {
+ public HashPartitionActivityNode(ActivityId id, int keys[], int operatorInputIndex) {
super(id);
- this.partitionsKey = partitionsKey;
this.keys = keys;
this.operatorInputIndex = operatorInputIndex;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
@@ -147,9 +173,10 @@
hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
private ByteBuffer[] outbufs;
- private RunFileWriter[] fWriters;
- private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+
+ private HashPartitionTaskState state;
@Override
public void close() throws HyracksDataException {
@@ -162,23 +189,24 @@
closeWriter(i);
}
- env.set(partitionsKey, fWriters);
+ env.setTaskState(state);
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = fWriters[i];
+ RunFileWriter writer = state.fWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = fWriters[i];
+ RunFileWriter writer = state.fWriters[i];
if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(partitionsKey);
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ GraceHashJoinOperatorDescriptor.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- fWriters[i] = writer;
+ state.fWriters[i] = writer;
}
writer.nextFrame(head);
}
@@ -208,8 +236,10 @@
@Override
public void open() throws HyracksDataException {
+ state = new HashPartitionTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
outbufs = new ByteBuffer[numPartitions];
- fWriters = new RunFileWriter[numPartitions];
+ state.fWriters = new RunFileWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
outbufs[i] = ctx.allocateFrame();
}
@@ -232,7 +262,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -255,8 +285,12 @@
@Override
public void initialize() throws HyracksDataException {
- buildWriters = (RunFileWriter[]) env.get(RELATION1);
- probeWriters = (RunFileWriter[]) env.get(RELATION0);
+ HashPartitionTaskState rState = (HashPartitionTaskState) env.getTaskState(new TaskId(
+ new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition));
+ HashPartitionTaskState sState = (HashPartitionTaskState) env.getTaskState(new TaskId(
+ new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition));
+ buildWriters = sState.fWriters;
+ probeWriters = rState.fWriters;
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -310,12 +344,6 @@
writer.close();
}
}
-
- @Override
- public void deinitialize() throws HyracksDataException {
- env.set(RELATION1, null);
- env.set(RELATION0, null);
- }
};
return op;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 720ea99..23064f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -32,6 +36,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -43,15 +48,14 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String JOINER0 = "joiner0";
- private static final String BUILDRELATION = "BuildRel";
- private static final String PROBERELATION = "ProbeRel";
- private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
- private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
+ private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+ private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
+
private final int memsize;
private static final long serialVersionUID = 1L;
private final int inputsize0;
@@ -117,8 +121,10 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), BUILDRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), PROBERELATION);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId,
+ BUILD_AND_PARTITION_ACTIVITY_ID));
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId,
+ PARTITION_AND_JOIN_ACTIVITY_ID));
builder.addActivity(phase1);
builder.addSourceEdge(1, phase1, 0);
@@ -131,18 +137,41 @@
builder.addTargetEdge(0, phase2, 0);
}
+ public static class BuildAndPartitionTaskState extends AbstractTaskState {
+ private RunFileWriter[] fWriters;
+ private InMemoryHashJoin joiner;
+ private int nPartitions;
+ private int memoryForHashtable;
+
+ public BuildAndPartitionTaskState() {
+ }
+
+ private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+
+ }
+
private class BuildAndPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String relationName;
- public BuildAndPartitionActivityNode(ActivityId id, String relationName) {
+ public BuildAndPartitionActivityNode(ActivityId id) {
super(id);
- this.relationName = relationName;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -157,7 +186,8 @@
}
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private InMemoryHashJoin joiner0;
+ private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories).createPartitioner();
@@ -165,16 +195,13 @@
private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] bufferForPartitions;
private final ByteBuffer inBuffer = ctx.allocateFrame();
- private RunFileWriter[] fWriters;
- private int memoryForHashtable;
- private int B;
@Override
public void close() throws HyracksDataException {
- if (memoryForHashtable != 0)
+ if (state.memoryForHashtable != 0)
build(inBuffer);
- for (int i = 0; i < B; i++) {
+ for (int i = 0; i < state.nPartitions; i++) {
ByteBuffer buf = bufferForPartitions[i];
accessorBuild.reset(buf);
if (accessorBuild.getTupleCount() > 0) {
@@ -183,22 +210,19 @@
closeWriter(i);
}
- env.set(relationName, fWriters);
- env.set(JOINER0, joiner0);
- env.set(NUM_PARTITION, B);
- env.set(MEM_HASHTABLE, memoryForHashtable);
+ env.setTaskState(state);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (memoryForHashtable != memsize - 2) {
+ if (state.memoryForHashtable != memsize - 2) {
accessorBuild.reset(buffer);
int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = -1;
- if (memoryForHashtable == 0) {
- entry = hpcBuild.partition(accessorBuild, i, B);
+ if (state.memoryForHashtable == 0) {
+ entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
@@ -213,7 +237,7 @@
}
} else {
entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
- if (entry < memoryForHashtable) {
+ if (entry < state.memoryForHashtable) {
while (true) {
if (!ftappender.append(accessorBuild, i)) {
build(inBuffer);
@@ -224,7 +248,7 @@
}
}
} else {
- entry %= B;
+ entry %= state.nPartitions;
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
@@ -250,27 +274,27 @@
private void build(ByteBuffer inBuffer) throws HyracksDataException {
ByteBuffer copyBuffer = ctx.allocateFrame();
FrameUtils.copy(inBuffer, copyBuffer);
- joiner0.build(copyBuffer);
+ state.joiner.build(copyBuffer);
}
@Override
public void open() throws HyracksDataException {
if (memsize > 1) {
if (memsize > inputsize0) {
- B = 0;
+ state.nPartitions = 0;
} else {
- B = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
+ state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
/ (double) (memsize - 1)));
}
- if (B <= 0) {
+ if (state.nPartitions <= 0) {
// becomes in-memory HJ
- memoryForHashtable = memsize - 2;
- B = 0;
+ state.memoryForHashtable = memsize - 2;
+ state.nPartitions = 0;
} else {
- memoryForHashtable = memsize - B - 2;
- if (memoryForHashtable < 0) {
- memoryForHashtable = 0;
- B = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+ state.memoryForHashtable = memsize - state.nPartitions - 2;
+ if (state.memoryForHashtable < 0) {
+ state.memoryForHashtable = 0;
+ state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
}
}
} else {
@@ -281,13 +305,14 @@
.createPartitioner();
ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
- int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
- joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators), isLeftOuter, nullWriters1);
- bufferForPartitions = new ByteBuffer[B];
- fWriters = new RunFileWriter[B];
- for (int i = 0; i < B; i++) {
+ int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+ state.joiner = new InMemoryHashJoin(ctx, tableSize,
+ new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
+ ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+ comparators), isLeftOuter, nullWriters1);
+ bufferForPartitions = new ByteBuffer[state.nPartitions];
+ state.fWriters = new RunFileWriter[state.nPartitions];
+ for (int i = 0; i < state.nPartitions; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
}
@@ -299,19 +324,20 @@
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = fWriters[i];
+ RunFileWriter writer = state.fWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = fWriters[i];
+ RunFileWriter writer = state.fWriters[i];
if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(relationName);
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ BuildAndPartitionActivityNode.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- fWriters[i] = writer;
+ state.fWriters[i] = writer;
}
writer.nextFrame(head);
}
@@ -322,16 +348,14 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String relationName;
- public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
+ public PartitionAndJoinActivityNode(ActivityId id) {
super(id);
- this.relationName = relationName;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -346,7 +370,7 @@
}
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private InMemoryHashJoin joiner0;
+ private BuildAndPartitionTaskState state;
private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
@@ -361,19 +385,16 @@
private RunFileWriter[] buildWriters;
private RunFileWriter[] probeWriters;
private ByteBuffer[] bufferForPartitions;
- private int B;
- private int memoryForHashtable;
@Override
public void open() throws HyracksDataException {
- joiner0 = (InMemoryHashJoin) env.get(JOINER0);
+ state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
- B = (Integer) env.get(NUM_PARTITION);
- memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
- probeWriters = new RunFileWriter[B];
- bufferForPartitions = new ByteBuffer[B];
- for (int i = 0; i < B; i++) {
+ buildWriters = state.fWriters;
+ probeWriters = new RunFileWriter[state.nPartitions];
+ bufferForPartitions = new ByteBuffer[state.nPartitions];
+ for (int i = 0; i < state.nPartitions; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
}
appender.reset(outBuffer, true);
@@ -382,14 +403,14 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (memoryForHashtable != memsize - 2) {
+ if (state.memoryForHashtable != memsize - 2) {
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
int entry = -1;
- if (memoryForHashtable == 0) {
- entry = hpcProbe.partition(accessorProbe, i, B);
+ if (state.memoryForHashtable == 0) {
+ entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
@@ -404,17 +425,17 @@
}
} else {
entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
- if (entry < memoryForHashtable) {
+ if (entry < state.memoryForHashtable) {
while (true) {
if (!ftap.append(accessorProbe, i)) {
- joiner0.join(inBuffer, writer);
+ state.joiner.join(inBuffer, writer);
ftap.reset(inBuffer, true);
} else
break;
}
} else {
- entry %= B;
+ entry %= state.nPartitions;
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
@@ -431,18 +452,20 @@
}
}
} else {
- joiner0.join(buffer, writer);
+ state.joiner.join(buffer, writer);
}
}
@Override
public void close() throws HyracksDataException {
- joiner0.join(inBuffer, writer);
- joiner0.closeJoin(writer);
- ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(B, hpcf0).createPartitioner();
- ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(B, hpcf1).createPartitioner();
- if (memoryForHashtable != memsize - 2) {
- for (int i = 0; i < B; i++) {
+ state.joiner.join(inBuffer, writer);
+ state.joiner.closeJoin(writer);
+ ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
+ .createPartitioner();
+ ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
+ .createPartitioner();
+ if (state.memoryForHashtable != memsize - 2) {
+ for (int i = 0; i < state.nPartitions; i++) {
ByteBuffer buf = bufferForPartitions[i];
accessorProbe.reset(buf);
if (accessorProbe.getTupleCount() > 0) {
@@ -453,12 +476,12 @@
inBuffer.clear();
int tableSize = -1;
- if (memoryForHashtable == 0) {
- tableSize = (int) (B * recordsPerFrame * factor);
+ if (state.memoryForHashtable == 0) {
+ tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
} else {
tableSize = (int) (memsize * recordsPerFrame * factor);
}
- for (int partitionid = 0; partitionid < B; partitionid++) {
+ for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
RunFileWriter buildWriter = buildWriters[partitionid];
RunFileWriter probeWriter = probeWriters[partitionid];
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
@@ -494,11 +517,6 @@
}
}
writer.close();
- env.set(PROBERELATION, null);
- env.set(BUILDRELATION, null);
- env.set(JOINER0, null);
- env.set(MEM_HASHTABLE, null);
- env.set(NUM_PARTITION, null);
}
private void closeWriter(int i) throws HyracksDataException {
@@ -511,7 +529,8 @@
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createManagedWorkspaceFile(relationName);
+ FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
+ .getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
probeWriters[i] = writer;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 5a28d1c..4a13e16 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -30,6 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -37,11 +42,13 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String JOINER = "joiner";
+ private static final int BUILD_ACTIVITY_ID = 0;
+ private static final int PROBE_ACTIVITY_ID = 1;
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -97,6 +104,27 @@
builder.addBlockingEdge(hba, hpa);
}
+ public static class HashBuildTaskState extends AbstractTaskState {
+ private InMemoryHashJoin joiner;
+
+ public HashBuildTaskState() {
+ }
+
+ private HashBuildTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private class HashBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -106,9 +134,11 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider
+ .getInputRecordDescriptor(getOperatorId(), BUILD_ACTIVITY_ID);
+ final RecordDescriptor rd1 = recordDescProvider
+ .getInputRecordDescriptor(getOperatorId(), PROBE_ACTIVITY_ID);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -121,7 +151,7 @@
}
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private InMemoryHashJoin joiner;
+ private HashBuildTaskState state;
@Override
public void open() throws HyracksDataException {
@@ -129,21 +159,24 @@
.createPartitioner();
ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
- joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators), isLeftOuter, nullWriters1);
+ state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.joiner = new InMemoryHashJoin(ctx, tableSize,
+ new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
+ ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+ comparators), isLeftOuter, nullWriters1);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer copyBuffer = ctx.allocateFrame();
FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
+ state.joiner.build(copyBuffer);
}
@Override
public void close() throws HyracksDataException {
- env.set(JOINER, joiner);
+ env.setTaskState(state);
}
@Override
@@ -163,26 +196,26 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private InMemoryHashJoin joiner;
+ private HashBuildTaskState state;
@Override
public void open() throws HyracksDataException {
- joiner = (InMemoryHashJoin) env.get(JOINER);
+ state = (HashBuildTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ BUILD_ACTIVITY_ID), partition));
writer.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- joiner.join(buffer, writer);
+ state.joiner.join(buffer, writer);
}
@Override
public void close() throws HyracksDataException {
- joiner.closeJoin(writer);
+ state.joiner.closeJoin(writer);
writer.close();
- env.set(JOINER, null);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 6cabbe6..de1b0ea 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -15,28 +15,35 @@
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String JOINER = "joiner";
+ private static final int JOIN_CACHE_ACTIVITY_ID = 0;
+ private static final int NL_JOIN_ACTIVITY_ID = 1;
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
@@ -52,8 +59,9 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), 0));
- NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(), 1));
+ JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID));
+ NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(),
+ NL_JOIN_ACTIVITY_ID));
builder.addActivity(jc);
builder.addSourceEdge(1, jc, 0);
@@ -65,6 +73,27 @@
builder.addBlockingEdge(jc, nlj);
}
+ public static class JoinCacheTaskState extends AbstractTaskState {
+ private NestedLoopJoin joiner;
+
+ public JoinCacheTaskState() {
+ }
+
+ private JoinCacheTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private class JoinCacheActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -74,17 +103,19 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private NestedLoopJoin joiner;
+ private JoinCacheTaskState state;
@Override
public void open() throws HyracksDataException {
- joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
}
@@ -93,13 +124,13 @@
ByteBuffer copyBuffer = ctx.allocateFrame();
FrameUtils.copy(buffer, copyBuffer);
FrameUtils.makeReadable(copyBuffer);
- joiner.cache(copyBuffer);
+ state.joiner.cache(copyBuffer);
}
@Override
public void close() throws HyracksDataException {
- joiner.closeCache();
- env.set(JOINER, joiner);
+ state.joiner.closeCache();
+ env.setTaskState(state);
}
@Override
@@ -119,27 +150,27 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private NestedLoopJoin joiner;
+ private JoinCacheTaskState state;
@Override
public void open() throws HyracksDataException {
- joiner = (NestedLoopJoin) env.get(JOINER);
+ state = (JoinCacheTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ JOIN_CACHE_ACTIVITY_ID), partition));
writer.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- joiner.join(buffer, writer);
+ state.joiner.join(buffer, writer);
}
@Override
public void close() throws HyracksDataException {
- joiner.closeJoin(writer);
+ state.joiner.closeJoin(writer);
writer.close();
- env.set(JOINER, null);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 448cd8a..32bfc09 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -14,28 +14,36 @@
*/
package edu.uci.ics.hyracks.dataflow.std.misc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
- protected static final String MATERIALIZED_FILE = "materialized-file";
+
+ private final static int MATERIALIZER_ACTIVITY_ID = 0;
+ private final static int READER_ACTIVITY_ID = 1;
public MaterializingOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
@@ -44,8 +52,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, 0));
- ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, 1));
+ MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+ ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
builder.addActivity(ma);
builder.addSourceEdge(0, ma, 0);
@@ -56,6 +64,27 @@
builder.addBlockingEdge(ma, ra);
}
+ public static class MaterializerTaskState extends AbstractTaskState {
+ private RunFileWriter out;
+
+ public MaterializerTaskState() {
+ }
+
+ private MaterializerTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private final class MaterializerActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -65,27 +94,29 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private RunFileWriter out;
+ private MaterializerTaskState state;
@Override
public void open() throws HyracksDataException {
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
MaterializingOperatorDescriptor.class.getSimpleName());
- out = new RunFileWriter(file, ctx.getIOManager());
- out.open();
+ state.out = new RunFileWriter(file, ctx.getIOManager());
+ state.out.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- out.nextFrame(buffer);
+ state.out.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
- out.close();
- env.set(MATERIALIZED_FILE, out);
+ state.out.close();
+ env.setTaskState(state);
}
@Override
@@ -104,13 +135,14 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
ByteBuffer frame = ctx.allocateFrame();
- RunFileWriter out = (RunFileWriter) env.get(MATERIALIZED_FILE);
- RunFileReader in = out.createReader();
+ MaterializerTaskState state = (MaterializerTaskState) env.getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
+ RunFileReader in = state.out.createReader();
writer.open();
try {
in.open();
@@ -130,7 +162,6 @@
@Override
public void deinitialize() throws HyracksDataException {
- env.set(MATERIALIZED_FILE, null);
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index a06bb45..351c8a6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -14,25 +14,52 @@
*/
package edu.uci.ics.hyracks.dataflow.std.misc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final int COLLECT_ACTIVITY_ID = 0;
+ private static final int SPLIT_ACTIVITY_ID = 1;
+
+ public static class CollectTaskState extends AbstractTaskState {
+ private ArrayList<Object[]> buffer;
+
+ public CollectTaskState() {
+ }
+
+ private CollectTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ }
+ }
+
private class CollectActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -46,10 +73,10 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
- private ArrayList<Object[]> buffer;
+ private CollectTaskState state;
@Override
public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
@@ -58,18 +85,19 @@
@Override
public void open() throws HyracksDataException {
- buffer = new ArrayList<Object[]>();
- env.set(BUFFER, buffer);
+ state = new CollectTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.buffer = new ArrayList<Object[]>();
}
@Override
public void close() throws HyracksDataException {
-
+ env.setTaskState(state);
}
@Override
public void writeData(Object[] data) throws HyracksDataException {
- buffer.add(data);
+ state.buffer.add(data);
}
@Override
@@ -91,10 +119,12 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private IOpenableDataWriter<Object[]> writer;
+ private CollectTaskState state;
+
@Override
public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
if (index != 0) {
@@ -105,6 +135,8 @@
@Override
public void open() throws HyracksDataException {
+ state = (CollectTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ COLLECT_ACTIVITY_ID), partition));
}
@Override
@@ -113,12 +145,11 @@
@Override
public void writeData(Object[] data) throws HyracksDataException {
- List<Object[]> buffer = (List<Object[]>) env.get(BUFFER);
- int n = buffer.size();
+ int n = state.buffer.size();
int step = (int) Math.floor(n / (float) splits);
writer.open();
for (int i = 0; i < splits; ++i) {
- writer.writeData(buffer.get(step * (i + 1) - 1));
+ writer.writeData(state.buffer.get(step * (i + 1) - 1));
}
writer.close();
}
@@ -133,8 +164,6 @@
}
}
- private static final String BUFFER = "buffer";
-
private static final long serialVersionUID = 1L;
private final int splits;
@@ -147,8 +176,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- CollectActivity ca = new CollectActivity(new ActivityId(odId, 0));
- SplitActivity sa = new SplitActivity(new ActivityId(odId, 1));
+ CollectActivity ca = new CollectActivity(new ActivityId(odId, COLLECT_ACTIVITY_ID));
+ SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
builder.addActivity(ca);
builder.addSourceEdge(0, ca, 0);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 55659a4..52b292b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -22,6 +25,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -29,17 +33,20 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String FRAMESORTER = "framesorter";
- private static final String RUNS = "runs";
-
private static final long serialVersionUID = 1L;
+
+ private static final int SORT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+
private final int[] sortFields;
private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
private final IBinaryComparatorFactory[] comparatorFactories;
@@ -66,8 +73,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SortActivity sa = new SortActivity(new ActivityId(odId, 0));
- MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
+ SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
builder.addActivity(sa);
builder.addSourceEdge(0, sa, 0);
@@ -78,6 +85,28 @@
builder.addBlockingEdge(sa, ma);
}
+ public static class SortTaskState extends AbstractTaskState {
+ private List<IFrameReader> runs;
+ private FrameSorter frameSorter;
+
+ public SortTaskState() {
+ }
+
+ private SortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
private class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -87,12 +116,14 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final ExternalSortRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields,
- firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0], framesLimit);
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private ExternalSortRunGenerator runGen;
+
@Override
public void open() throws HyracksDataException {
+ runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0], framesLimit);
runGen.open();
}
@@ -103,9 +134,12 @@
@Override
public void close() throws HyracksDataException {
+ SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+ getActivityId(), partition));
runGen.close();
- env.set(FRAMESORTER, runGen.getFrameSorter());
- env.set(RUNS, runGen.getRuns());
+ state.runs = runGen.getRuns();
+ state.frameSorter = runGen.getFrameSorter();
+ env.setTaskState(state);
}
@Override
@@ -126,12 +160,14 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- List<IFrameReader> runs = (List<IFrameReader>) env.get(RUNS);
- FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+ SortTaskState state = (SortTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ SORT_ACTIVITY_ID), partition));
+ List<IFrameReader> runs = state.runs;
+ FrameSorter frameSorter = state.frameSorter;
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -139,8 +175,6 @@
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
comparators, recordDescriptors[0], framesLimit, writer);
merger.process();
- env.set(FRAMESORTER, null);
- env.set(RUNS, null);
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 75929ce..eafe738 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -14,28 +14,36 @@
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String FRAMESORTER = "framesorter";
-
private static final long serialVersionUID = 1L;
+
+ private static final int SORT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+
private final int[] sortFields;
private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
private IBinaryComparatorFactory[] comparatorFactories;
@@ -57,8 +65,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SortActivity sa = new SortActivity(new ActivityId(odId, 0));
- MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
+ SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
builder.addActivity(sa);
builder.addSourceEdge(0, sa, 0);
@@ -69,6 +77,25 @@
builder.addBlockingEdge(sa, ma);
}
+ public static class SortTaskState extends AbstractTaskState {
+ private FrameSorter frameSorter;
+
+ public SortTaskState() {
+ }
+
+ private SortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ }
+ }
+
private class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -78,24 +105,27 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0]);
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private SortTaskState state;
+
@Override
public void open() throws HyracksDataException {
- frameSorter.reset();
+ state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
+ state.frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0]);
+ state.frameSorter.reset();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- frameSorter.insertFrame(buffer);
+ state.frameSorter.insertFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
- frameSorter.sortFrames();
- env.set(FRAMESORTER, frameSorter);
+ state.frameSorter.sortFrames();
+ env.setTaskState(state);
}
@Override
@@ -115,21 +145,21 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
writer.open();
try {
- FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
- frameSorter.flushFrames(writer);
+ SortTaskState state = (SortTaskState) env.getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), SORT_ACTIVITY_ID), partition));
+ state.frameSorter.flushFrames(writer);
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
} finally {
writer.close();
}
- env.set(FRAMESORTER, null);
}
};
return op;