merged r1422:1429 from hyracks_asterix_stabilization
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1431 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
similarity index 89%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
index c24e488..f216773 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
@@ -18,13 +18,12 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface ITaskState {
+public interface IStateObject {
public JobId getJobId();
- public TaskId getTaskId();
+ public Object getId();
public long getMemoryOccupancy();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
index ecc44c1..eaa1ec2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -14,11 +14,10 @@
*/
package edu.uci.ics.hyracks.api.job;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
public interface IOperatorEnvironment {
- public void setTaskState(ITaskState taskState);
+ public void setStateObject(IStateObject taskState);
- public ITaskState getTaskState(TaskId taskId);
+ public IStateObject getStateObject(Object id);
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index f2c6993..a72f64c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -465,7 +465,8 @@
final JobId jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
- final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
+ jobRun.getConnectorPolicyMap());
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
String nodeId = entry.getKey();
final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8f7bc4a..c62b446 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -26,8 +26,7 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -65,7 +64,7 @@
private final IOperatorEnvironment env;
- private final Map<TaskId, ITaskState> taskStateMap;
+ private final Map<Object, IStateObject> stateObjectMap;
private final Map<TaskAttemptId, Task> taskMap;
@@ -88,7 +87,7 @@
this.jag = jag;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
env = new OperatorEnvironmentImpl(nodeController.getId());
- taskStateMap = new HashMap<TaskId, ITaskState>();
+ stateObjectMap = new HashMap<Object, IStateObject>();
taskMap = new HashMap<TaskAttemptId, Task>();
counterMap = new HashMap<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -136,13 +135,13 @@
}
@Override
- public void setTaskState(ITaskState taskState) {
- taskStateMap.put(taskState.getTaskId(), taskState);
+ public void setStateObject(IStateObject taskState) {
+ stateObjectMap.put(taskState.getId(), taskState);
}
@Override
- public ITaskState getTaskState(TaskId taskId) {
- return taskStateMap.get(taskId);
+ public IStateObject getStateObject(Object id) {
+ return stateObjectMap.get(id);
}
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 1bb0b2f..7addda8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -33,8 +33,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -333,12 +332,12 @@
}
@Override
- public void setTaskState(ITaskState taskState) {
- opEnv.setTaskState(taskState);
+ public void setStateObject(IStateObject taskState) {
+ opEnv.setStateObject(taskState);
}
@Override
- public ITaskState getTaskState(TaskId taskId) {
- return opEnv.getTaskState(taskId);
+ public IStateObject getStateObject(Object id) {
+ return opEnv.getStateObject(id);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java
new file mode 100644
index 0000000..753fb98
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public abstract class AbstractStateObject implements IStateObject {
+ protected JobId jobId;
+
+ protected Object id;
+
+ protected long memoryOccupancy;
+
+ protected AbstractStateObject() {
+ }
+
+ protected AbstractStateObject(JobId jobId, Object id) {
+ this.jobId = jobId;
+ this.id = id;
+ }
+
+ @Override
+ public final JobId getJobId() {
+ return jobId;
+ }
+
+ public final void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public final Object getId() {
+ return id;
+ }
+
+ public final void setId(Object id) {
+ this.id = id;
+ }
+
+ @Override
+ public final long getMemoryOccupancy() {
+ return memoryOccupancy;
+ }
+
+ public void setMemoryOccupancy(long memoryOccupancy) {
+ this.memoryOccupancy = memoryOccupancy;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
deleted file mode 100644
index ba4dfd1..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package edu.uci.ics.hyracks.dataflow.std.base;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public abstract class AbstractTaskState implements ITaskState {
- protected JobId jobId;
-
- protected TaskId taId;
-
- protected long memoryOccupancy;
-
- protected AbstractTaskState() {
- }
-
- protected AbstractTaskState(JobId jobId, TaskId taId) {
- this.jobId = jobId;
- this.taId = taId;
- }
-
- @Override
- public final JobId getJobId() {
- return jobId;
- }
-
- public final void setJobId(JobId jobId) {
- this.jobId = jobId;
- }
-
- @Override
- public final TaskId getTaskId() {
- return taId;
- }
-
- public final void setTaskId(TaskId tId) {
- this.taId = tId;
- }
-
- @Override
- public final long getMemoryOccupancy() {
- return memoryOccupancy;
- }
-
- public void setMemoryOccupancy(long memoryOccupancy) {
- this.memoryOccupancy = memoryOccupancy;
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java
new file mode 100644
index 0000000..f36f681
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final Object stateId;
+ private final int[] keyFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final int framesLimit;
+ private final ISpillableTableFactory spillableTableFactory;
+ private final RecordDescriptor inRecordDescriptor;
+ private final RecordDescriptor outRecordDescriptor;
+ private final FrameTupleAccessor accessor;
+
+ private ExternalGroupState state;
+
+ ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, ISpillableTableFactory spillableTableFactory) {
+ this.ctx = ctx;
+ this.stateId = stateId;
+ this.framesLimit = framesLimit;
+ this.aggregatorFactory = aggregatorFactory;
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+ this.spillableTableFactory = spillableTableFactory;
+ this.inRecordDescriptor = inRecordDescriptor;
+ this.outRecordDescriptor = outRecordDescriptor;
+ this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new ExternalGroupState(ctx.getJobletContext().getJobId(), stateId);
+ state.setRuns(new LinkedList<RunFileReader>());
+ ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+ firstNormalizerFactory, aggregatorFactory, inRecordDescriptor, outRecordDescriptor, framesLimit);
+ table.reset();
+ state.setSpillableTable(table);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ ISpillableTable gTable = state.getSpillableTable();
+ for (int i = 0; i < tupleCount; i++) {
+ /**
+ * If the group table is too large, flush the table into
+ * a run file.
+ */
+ if (!gTable.insert(accessor, i)) {
+ flushFramesToRun();
+ if (!gTable.insert(accessor, i))
+ throw new HyracksDataException("Failed to insert a new buffer into the aggregate operator!");
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ //do nothing for failures
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ISpillableTable gTable = state.getSpillableTable();
+ if (gTable.getFrameCount() >= 0) {
+ if (state.getRuns().size() > 0) {
+ /**
+ * flush the memory into the run file.
+ */
+ flushFramesToRun();
+ gTable.close();
+ gTable = null;
+ }
+ }
+ ctx.setStateObject(state);
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+ writer.open();
+ ISpillableTable gTable = state.getSpillableTable();
+ try {
+ gTable.sortFrames();
+ gTable.flushFrames(writer, true);
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ writer.close();
+ }
+ gTable.reset();
+ state.getRuns().add(((RunFileWriter) writer).createReader());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java
new file mode 100644
index 0000000..d08ad0e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java
@@ -0,0 +1,458 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final Object stateId;
+ private final int[] keyFields;
+ private final IBinaryComparator[] comparators;
+ private final AggregateState aggregateState;
+ private final ArrayTupleBuilder tupleBuilder;
+ private final int[] storedKeys;
+ private final IAggregatorDescriptor aggregator;
+ private final boolean isOutputSorted;
+ private final int framesLimit;
+ private final RecordDescriptor outRecordDescriptor;
+ /**
+ * Input frames, one for each run file.
+ */
+ private List<ByteBuffer> inFrames;
+ /**
+ * Output frame.
+ */
+ private ByteBuffer outFrame, writerFrame;
+ private final FrameTupleAppender outAppender;
+ private FrameTupleAppender writerAppender;
+ private LinkedList<RunFileReader> runs;
+ private ExternalGroupState aggState;
+ private ArrayTupleBuilder finalTupleBuilder;
+ /**
+ * how many frames to be read ahead once
+ */
+ private int runFrameLimit = 1;
+ private int[] currentFrameIndexInRun;
+ private int[] currentRunFrames;
+ private final FrameTupleAccessor outFrameAccessor;
+
+ ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
+ IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+ IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+ this.stateId = stateId;
+ this.keyFields = keyFields;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ int[] keyFieldsInPartialResults = new int[keyFields.length];
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ keyFieldsInPartialResults[i] = i;
+ }
+
+ aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
+ keyFieldsInPartialResults);
+ aggregateState = aggregator.createAggregateStates();
+
+ storedKeys = new int[keyFields.length];
+ /**
+ * Get the list of the fields in the stored records.
+ */
+ for (int i = 0; i < keyFields.length; ++i) {
+ storedKeys[i] = i;
+ }
+
+ tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ this.ctx = ctx;
+ outAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+ this.isOutputSorted = isOutputSorted;
+ this.framesLimit = framesLimit;
+ this.outRecordDescriptor = outRecordDescriptor;
+ }
+
+ public void initialize() throws HyracksDataException {
+ aggState = (ExternalGroupState) ctx.getStateObject(stateId);
+ runs = aggState.getRuns();
+ writer.open();
+ try {
+ if (runs.size() <= 0) {
+ ISpillableTable gTable = aggState.getSpillableTable();
+ if (gTable != null) {
+ if (isOutputSorted)
+ gTable.sortFrames();
+ gTable.flushFrames(writer, false);
+ }
+ gTable = null;
+ aggState = null;
+ } else {
+ aggState = null;
+ runs = new LinkedList<RunFileReader>(runs);
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outAppender.reset(outFrame, true);
+ outFrameAccessor.reset(outFrame);
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ inFrames.clear();
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ aggregateState.close();
+ writer.close();
+ }
+ }
+
+ private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+
+ while (inFrames.size() + 2 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int runNumber;
+ if (runs.size() + 2 <= framesLimit) {
+ finalPass = true;
+ runFrameLimit = (framesLimit - 2) / runs.size();
+ runNumber = runs.size();
+ } else {
+ runNumber = framesLimit - 2;
+ newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ currentFrameIndexInRun = new int[runNumber];
+ currentRunFrames = new int[runNumber];
+ /**
+ * Create file readers for each input run file, only for
+ * the ones fit into the inFrames
+ */
+ RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
+ runNumber, comparator);
+ /**
+ * current tuple index in each run
+ */
+ int[] tupleIndices = new int[runNumber];
+
+ for (int i = 0; i < runNumber; i++) {
+ int runIndex = topTuples.peek().getRunid();
+ tupleIndices[runIndex] = 0;
+ // Load the run file
+ runFileReaders[runIndex] = runs.get(runIndex);
+ runFileReaders[runIndex].open();
+
+ currentRunFrames[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ currentRunFrames[runIndex]++;
+ if (j == 0)
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Start merging
+ */
+ while (!topTuples.areRunsExhausted()) {
+ /**
+ * Get the top record
+ */
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+ /**
+ * Initialize the first output record Reset the
+ * tuple builder
+ */
+
+ tupleBuilder.reset();
+
+ for (int k = 0; k < storedKeys.length; k++) {
+ tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
+ }
+
+ aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
+
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ flushOutFrame(writer, finalPass);
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "The partial result is too large to be initialized in a frame.");
+ }
+ }
+
+ } else {
+ /**
+ * if new tuple is in the same group of the
+ * current aggregator do merge and output to the
+ * outFrame
+ */
+
+ aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+
+ }
+ tupleIndices[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ }
+
+ if (outAppender.getTupleCount() > 0) {
+ flushOutFrame(writer, finalPass);
+ outAppender.reset(outFrame, true);
+ }
+
+ aggregator.close();
+
+ runs.subList(0, runNumber).clear();
+ /**
+ * insert the new run file into the beginning of the run
+ * file list
+ */
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ }
+
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+
+ if (writerAppender == null) {
+ writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+ writerAppender.reset(writerFrame, true);
+ }
+
+ outFrameAccessor.reset(outFrame);
+
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+ finalTupleBuilder.reset();
+
+ for (int k = 0; k < storedKeys.length; k++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
+ }
+
+ if (isFinal) {
+
+ aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+
+ } else {
+
+ aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+ }
+
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+ }
+ }
+ }
+ if (writerAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ }
+
+ outAppender.reset(outFrame, true);
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+ int runStart = runIndex * runFrameLimit;
+ boolean existNext = false;
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+ /**
+ * run already closed
+ */
+ existNext = false;
+ } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+ /**
+ * not the last frame for this run
+ */
+ existNext = true;
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex]++;
+ }
+ } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ /**
+ * the last frame has expired
+ */
+ existNext = true;
+ } else {
+ /**
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runStart;
+ /**
+ * read in batch
+ */
+ currentRunFrames[runIndex] = 0;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ existNext = true;
+ currentRunFrames[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (existNext) {
+ topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ int frameOffset = index * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ tupleAccessor[frameOffset + j] = null;
+ }
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+
+ @Override
+ public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+ int j1 = o1.getTupleIndex();
+ int j2 = o2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 6f7b706..1045f14 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -14,44 +14,19 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
/**
*
@@ -123,29 +98,6 @@
builder.addBlockingEdge(aggregateAct, mergeAct);
}
- public static class AggregateActivityState extends AbstractTaskState {
- private LinkedList<RunFileReader> runs;
-
- private ISpillableTable gTable;
-
- public AggregateActivityState() {
- }
-
- private AggregateActivityState(JobId jobId, TaskId tId) {
- super(jobId, tId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
- throw new UnsupportedOperationException();
- }
- }
-
private class AggregateActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -157,84 +109,10 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private AggregateActivityState state;
-
- @Override
- public void open() throws HyracksDataException {
- state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
- state.runs = new LinkedList<RunFileReader>();
- state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
- firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
- ExternalGroupOperatorDescriptor.this.framesLimit);
- state.gTable.reset();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- /**
- * If the group table is too large, flush the table into
- * a run file.
- */
- if (!state.gTable.insert(accessor, i)) {
- flushFramesToRun();
- if (!state.gTable.insert(accessor, i))
- throw new HyracksDataException(
- "Failed to insert a new buffer into the aggregate operator!");
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- //do nothing for failures
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (state.gTable.getFrameCount() >= 0) {
- if (state.runs.size() > 0) {
- /**
- * flush the memory into the run file.
- */
- flushFramesToRun();
- state.gTable.close();
- state.gTable = null;
- }
- }
- ctx.setTaskState(state);
- }
-
- private void flushFramesToRun() throws HyracksDataException {
- FileReference runFile;
- try {
- runFile = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class.getSimpleName());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
- writer.open();
- try {
- state.gTable.sortFrames();
- state.gTable.flushFrames(writer, true);
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- writer.close();
- }
- state.gTable.reset();
- state.runs.add(((RunFileWriter) writer).createReader());
- }
- };
- return op;
+ return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keyFields,
+ framesLimit, comparatorFactories, firstNormalizerFactory, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ spillableTableFactory);
}
}
@@ -249,423 +127,9 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- int[] keyFieldsInPartialResults = new int[keyFields.length];
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- keyFieldsInPartialResults[i] = i;
- }
-
- final IAggregatorDescriptor aggregator = mergerFactory.createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0], keyFields, keyFieldsInPartialResults);
- final AggregateState aggregateState = aggregator.createAggregateStates();
-
- final int[] storedKeys = new int[keyFields.length];
- /**
- * Get the list of the fields in the stored records.
- */
- for (int i = 0; i < keyFields.length; ++i) {
- storedKeys[i] = i;
- }
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
-
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- /**
- * Input frames, one for each run file.
- */
- private List<ByteBuffer> inFrames;
-
- /**
- * Output frame.
- */
- private ByteBuffer outFrame, writerFrame;
- private final FrameTupleAppender outAppender = new FrameTupleAppender(ctx.getFrameSize());
- private FrameTupleAppender writerAppender;
-
- private LinkedList<RunFileReader> runs;
-
- private AggregateActivityState aggState;
-
- private ArrayTupleBuilder finalTupleBuilder;
-
- /**
- * how many frames to be read ahead once
- */
- private int runFrameLimit = 1;
-
- private int[] currentFrameIndexInRun;
- private int[] currentRunFrames;
-
- private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptors[0]);
-
- public void initialize() throws HyracksDataException {
- aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
- AGGREGATE_ACTIVITY_ID), partition));
- runs = aggState.runs;
- writer.open();
- try {
- if (runs.size() <= 0) {
- ISpillableTable gTable = aggState.gTable;
- if (gTable != null) {
- if (isOutputSorted)
- gTable.sortFrames();
- gTable.flushFrames(writer, false);
- }
- gTable = null;
- aggState = null;
- } else {
- aggState = null;
- runs = new LinkedList<RunFileReader>(runs);
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outAppender.reset(outFrame, true);
- outFrameAccessor.reset(outFrame);
- while (runs.size() > 0) {
- try {
- doPass(runs);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- inFrames.clear();
- }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- aggregateState.close();
- writer.close();
- }
- }
-
- private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
-
- while (inFrames.size() + 2 < framesLimit) {
- inFrames.add(ctx.allocateFrame());
- }
- int runNumber;
- if (runs.size() + 2 <= framesLimit) {
- finalPass = true;
- runFrameLimit = (framesLimit - 2) / runs.size();
- runNumber = runs.size();
- } else {
- runNumber = framesLimit - 2;
- newRun = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalGroupOperatorDescriptor.class.getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- currentFrameIndexInRun = new int[runNumber];
- currentRunFrames = new int[runNumber];
- /**
- * Create file readers for each input run file, only for
- * the ones fit into the inFrames
- */
- RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
- recordDescriptors[0], runNumber, comparator);
- /**
- * current tuple index in each run
- */
- int[] tupleIndices = new int[runNumber];
-
- for (int i = 0; i < runNumber; i++) {
- int runIndex = topTuples.peek().getRunid();
- tupleIndices[runIndex] = 0;
- // Load the run file
- runFileReaders[runIndex] = runs.get(runIndex);
- runFileReaders[runIndex].open();
-
- currentRunFrames[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
- for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex] + j;
- if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptors[0]);
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
- currentRunFrames[runIndex]++;
- if (j == 0)
- setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
- topTuples);
- } else {
- break;
- }
- }
- }
-
- /**
- * Start merging
- */
- while (!topTuples.areRunsExhausted()) {
- /**
- * Get the top record
- */
- ReferenceEntry top = topTuples.peek();
- int tupleIndex = top.getTupleIndex();
- int runIndex = topTuples.peek().getRunid();
- FrameTupleAccessor fta = top.getAccessor();
-
- int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
- if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
- /**
- * Initialize the first output record Reset the
- * tuple builder
- */
-
- tupleBuilder.reset();
-
- for(int k = 0; k < storedKeys.length; k++){
- tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
- }
-
- aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
-
- if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- flushOutFrame(writer, finalPass);
- if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- throw new HyracksDataException(
- "The partial result is too large to be initialized in a frame.");
- }
- }
-
- } else {
- /**
- * if new tuple is in the same group of the
- * current aggregator do merge and output to the
- * outFrame
- */
-
- aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame,
- aggregateState);
-
- }
- tupleIndices[runIndex]++;
- setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
- }
-
- if (outAppender.getTupleCount() > 0) {
- flushOutFrame(writer, finalPass);
- outAppender.reset(outFrame, true);
- }
-
- aggregator.close();
-
- runs.subList(0, runNumber).clear();
- /**
- * insert the new run file into the beginning of the run
- * file list
- */
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
-
- private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-
- if (finalTupleBuilder == null) {
- finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
- }
-
- if (writerFrame == null) {
- writerFrame = ctx.allocateFrame();
- }
-
- if (writerAppender == null) {
- writerAppender = new FrameTupleAppender(ctx.getFrameSize());
- writerAppender.reset(writerFrame, true);
- }
-
- outFrameAccessor.reset(outFrame);
-
- for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
- finalTupleBuilder.reset();
-
- for (int k = 0; k < storedKeys.length; k++) {
- finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
- }
-
- if (isFinal) {
-
- aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
-
- } else {
-
- aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
- }
-
- if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
- if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Aggregation output is too large to be fit into a frame.");
- }
- }
- }
- if (writerAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
- }
-
- outAppender.reset(outFrame, true);
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
- throws HyracksDataException {
- int runStart = runIndex * runFrameLimit;
- boolean existNext = false;
- if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
- /**
- * run already closed
- */
- existNext = false;
- } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
- /**
- * not the last frame for this run
- */
- existNext = true;
- if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex]++;
- }
- } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
- .getTupleCount()) {
- /**
- * the last frame has expired
- */
- existNext = true;
- } else {
- /**
- * If all tuples in the targeting frame have been
- * checked.
- */
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runStart;
- /**
- * read in batch
- */
- currentRunFrames[runIndex] = 0;
- for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex]
- + j;
- if (runCursors[runIndex].nextFrame(inFrames
- .get(frameIndex))) {
- tupleAccessors[frameIndex].reset(inFrames
- .get(frameIndex));
- existNext = true;
- currentRunFrames[runIndex]++;
- } else {
- break;
- }
- }
- }
-
- if (existNext) {
- topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
- tupleIndices[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- /**
- * Close the run file, and also the corresponding readers and
- * input frame.
- *
- * @param index
- * @param runCursors
- * @param tupleAccessor
- * @throws HyracksDataException
- */
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- if (runCursors[index] != null) {
- runCursors[index].close();
- runCursors[index] = null;
- int frameOffset = index * runFrameLimit;
- for (int j = 0; j < runFrameLimit; j++) {
- tupleAccessor[frameOffset + j] = null;
- }
- }
- }
-
- private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2_start = fta2.getFieldStartOffset(j2, fIdx);
- int l2_end = fta2.getFieldEndOffset(j2, fIdx);
- int l2 = l2_end - l2_start;
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- return op;
- }
-
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
-
- @Override
- public int compare(ReferenceEntry o1, ReferenceEntry o2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
- int j1 = o1.getTupleIndex();
- int j2 = o2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- };
+ return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+ AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, keyFields, mergerFactory, isOutputSorted,
+ framesLimit, recordDescriptors[0]);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java
new file mode 100644
index 0000000..365c333
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class ExternalGroupState extends AbstractStateObject {
+ private LinkedList<RunFileReader> runs;
+
+ private ISpillableTable gTable;
+
+ public ExternalGroupState() {
+ }
+
+ ExternalGroupState(JobId jobId, Object id) {
+ super(jobId, id);
+ }
+
+ public LinkedList<RunFileReader> getRuns() {
+ return runs;
+ }
+
+ public void setRuns(LinkedList<RunFileReader> runs) {
+ this.runs = runs;
+ }
+
+ public ISpillableTable getSpillableTable() {
+ return gTable;
+ }
+
+ public void setSpillableTable(ISpillableTable gTable) {
+ this.gTable = gTable;
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java
new file mode 100644
index 0000000..98617c0
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class HashGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleAccessor accessor;
+ private final Object stateId;
+ private final int[] keys;
+ private final ITuplePartitionComputerFactory tpcf;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final int tableSize;
+ private final RecordDescriptor inRecordDescriptor;
+ private final RecordDescriptor outRecordDescriptor;
+
+ private HashGroupState state;
+
+ HashGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
+ ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory, int tableSize, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) {
+ this.ctx = ctx;
+ this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ this.stateId = stateId;
+ this.keys = keys;
+ this.tpcf = tpcf;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.tableSize = tableSize;
+ this.inRecordDescriptor = inRecordDescriptor;
+ this.outRecordDescriptor = outRecordDescriptor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new HashGroupState(ctx.getJobletContext().getJobId(), stateId);
+ state.setHashTable(new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+ inRecordDescriptor, outRecordDescriptor, tableSize));
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ GroupingHashTable table = state.getHashTable();
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; ++i) {
+ try {
+ table.insert(accessor, i);
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ throw new HyracksDataException("HashGroupOperator is failed.");
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 49443d1..d9052b1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -14,11 +14,6 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -28,15 +23,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
*
@@ -89,27 +78,6 @@
builder.addBlockingEdge(ha, oa);
}
- public static class HashBuildActivityState extends AbstractTaskState {
- private GroupingHashTable table;
-
- public HashBuildActivityState() {
- }
-
- private HashBuildActivityState(JobId jobId, TaskId tId) {
- super(jobId, tId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
- }
-
private class HashBuildActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -120,44 +88,9 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private HashBuildActivityState state;
-
- @Override
- public void open() throws HyracksDataException {
- state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
- state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
- tableSize);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- try {
- state.table.insert(accessor, i);
- } catch (Exception e) {
- System.out.println(e.toString());
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- ctx.setTaskState(state);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- throw new HyracksDataException("HashGroupOperator is failed.");
- }
- };
+ return new HashGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keys, tpcf,
+ comparatorFactories, aggregatorFactory, tableSize, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0]);
}
}
@@ -171,23 +104,8 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
- new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
- GroupingHashTable table = buildState.table;
- writer.open();
- try {
- table.write(writer);
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
- };
+ return new HashGroupOutputOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+ HASH_BUILD_ACTIVITY_ID), partition));
}
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java
new file mode 100644
index 0000000..b120297
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+class HashGroupOutputOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final Object stateId;
+
+ HashGroupOutputOperatorNodePushable(IHyracksTaskContext ctx, Object stateId) {
+ this.ctx = ctx;
+ this.stateId = stateId;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ HashGroupState buildState = (HashGroupState) ctx.getStateObject(stateId);
+ GroupingHashTable table = buildState.getHashTable();
+ writer.open();
+ try {
+ table.write(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java
new file mode 100644
index 0000000..b0eb5b8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class HashGroupState extends AbstractStateObject {
+ private GroupingHashTable table;
+
+ public HashGroupState() {
+ }
+
+ HashGroupState(JobId jobId, Object id) {
+ super(jobId, id);
+ }
+
+ public GroupingHashTable getHashTable() {
+ return table;
+ }
+
+ public void setHashTable(GroupingHashTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 773e1e5..6a0535b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -14,43 +14,19 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int RPARTITION_ACTIVITY_ID = 0;
@@ -129,24 +105,6 @@
return memsize;
}
- public static class HashPartitionTaskState extends AbstractTaskState {
- private RunFileWriter[] fWriters;
-
- public HashPartitionTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
- }
-
private class HashPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
private int operatorInputIndex;
@@ -159,97 +117,12 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
-
- private final ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys,
- hashFunctionFactories).createPartitioner();
-
- private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-
- private ByteBuffer[] outbufs;
-
- private HashPartitionTaskState state;
-
- @Override
- public void close() throws HyracksDataException {
- for (int i = 0; i < numPartitions; i++) {
- ByteBuffer head = outbufs[i];
- accessor0.reset(head);
- if (accessor0.getTupleCount() > 0) {
- write(i, head);
- }
- closeWriter(i);
- }
-
- ctx.setTaskState(state);
- }
-
- private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = state.fWriters[i];
- if (writer != null) {
- writer.close();
- }
- }
-
- private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = state.fWriters[i];
- if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- GraceHashJoinOperatorDescriptor.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- state.fWriters[i] = writer;
- }
- writer.nextFrame(head);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
- for (int i = 0; i < tCount; ++i) {
-
- int entry = hpc.partition(accessor0, i, numPartitions);
- ByteBuffer outbuf = outbufs[entry];
- appender.reset(outbuf, false);
- if (!appender.append(accessor0, i)) {
- // buffer is full, ie. we cannot fit the tuple
- // into the buffer -- write it to disk
- write(entry, outbuf);
- outbuf.clear();
- appender.reset(outbuf, true);
- if (!appender.append(accessor0, i)) {
- throw new HyracksDataException("Item too big to fit in frame");
- }
- }
- }
- }
-
- @Override
- public void open() throws HyracksDataException {
- state = new HashPartitionTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
- outbufs = new ByteBuffer[numPartitions];
- state.fWriters = new RunFileWriter[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- outbufs[i] = ctx.allocateFrame();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
- return op;
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition),
+ keys, hashFunctionFactories, comparatorFactories, (int) Math.ceil(Math.sqrt(inputsize0 * factor
+ / nPartitions)), recordDescProvider.getInputRecordDescriptor(getOperatorId(),
+ operatorInputIndex));
}
}
@@ -263,92 +136,15 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
- final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
- if (isLeftOuter) {
- for (int i = 0; i < nullWriterFactories1.length; i++) {
- nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
- }
- }
+ int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- private InMemoryHashJoin joiner;
-
- private RunFileWriter[] buildWriters;
- private RunFileWriter[] probeWriters;
- private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-
- @Override
- public void initialize() throws HyracksDataException {
- HashPartitionTaskState rState = (HashPartitionTaskState) ctx.getTaskState(new TaskId(
- new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition));
- HashPartitionTaskState sState = (HashPartitionTaskState) ctx.getTaskState(new TaskId(
- new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition));
- buildWriters = sState.fWriters;
- probeWriters = rState.fWriters;
-
- ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
- new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
- ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
- new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
-
- writer.open();// open for probe
-
- try {
-
- ByteBuffer buffer = ctx.allocateFrame();// input
- // buffer
- int tableSize = (int) (numPartitions * recordsPerFrame * factor);
- ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-
- for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter buildWriter = buildWriters[partitionid];
- RunFileWriter probeWriter = probeWriters[partitionid];
- if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
- continue;
- }
- table.reset();
- joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
- rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
-
- // build
- if (buildWriter != null) {
- RunFileReader buildReader = buildWriter.createReader();
- buildReader.open();
- while (buildReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
- buffer.clear();
- }
- buildReader.close();
- }
-
- // probe
- RunFileReader probeReader = probeWriter.createReader();
- probeReader.open();
- while (probeReader.nextFrame(buffer)) {
- joiner.join(buffer, writer);
- buffer.clear();
- }
- probeReader.close();
- joiner.closeJoin(writer);
- }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
- };
- return op;
+ return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+ RPARTITION_ACTIVITY_ID), partition), new TaskId(new ActivityId(getOperatorId(),
+ SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, factor, keys0, keys1, hashFunctionFactories,
+ comparatorFactories, nullWriterFactories1, rd1, rd0, recordDescriptors[0], numPartitions,
+ isLeftOuter);
}
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
new file mode 100644
index 0000000..91f509d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+
+class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final Object state0Id;
+ private final Object state1Id;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INullWriterFactory[] nullWriterFactories;
+ private final RecordDescriptor rd0;
+ private final RecordDescriptor rd1;
+ private final int recordsPerFrame;
+ private final double factor;
+ private final int numPartitions;
+ private final boolean isLeftOuter;
+
+ GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
+ RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
+ boolean isLeftOuter) {
+ this.ctx = ctx;
+ this.state0Id = state0Id;
+ this.state1Id = state1Id;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.nullWriterFactories = nullWriterFactories;
+ this.rd0 = rd0;
+ this.rd1 = rd1;
+ this.numPartitions = numPartitions;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.isLeftOuter = isLeftOuter;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState) ctx.getStateObject(state0Id);
+ GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState) ctx.getStateObject(state1Id);
+ RunFileWriter[] buildWriters = sState.getRunWriters();
+ RunFileWriter[] probeWriters = rState.getRunWriters();
+
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
+ new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
+ ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
+ new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
+
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories.length; i++) {
+ nullWriters1[i] = nullWriterFactories[i].createNullWriter();
+ }
+ }
+
+ writer.open();// open for probe
+
+ try {
+
+ ByteBuffer buffer = ctx.allocateFrame();// input
+ // buffer
+ int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+
+ for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+ continue;
+ }
+ table.reset();
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
+ ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
+
+ // build
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
+ }
+
+ // probe
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
+ joiner.join(buffer, writer);
+ buffer.clear();
+ }
+ probeReader.close();
+ joiner.closeJoin(writer);
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
new file mode 100644
index 0000000..719b736
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class GraceHashJoinPartitionBuildOperatorNodePushable extends
+ AbstractUnaryInputSinkOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final Object stateId;
+ private final int numPartitions;
+ private final IBinaryComparator[] comparators;
+ private final FrameTupleAccessor accessor0;
+ private final ITuplePartitionComputer hpc;
+ private final FrameTupleAppender appender;
+ private ByteBuffer[] outbufs;
+ private GraceHashJoinPartitionState state;
+
+ GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ int numPartitions, RecordDescriptor inRecordDescriptor) {
+ this.ctx = ctx;
+ this.stateId = stateId;
+ this.numPartitions = numPartitions;
+ accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories).createPartitioner();
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < numPartitions; i++) {
+ ByteBuffer head = outbufs[i];
+ accessor0.reset(head);
+ if (accessor0.getTupleCount() > 0) {
+ write(i, head);
+ }
+ closeWriter(i);
+ }
+
+ ctx.setStateObject(state);
+ }
+
+ private void closeWriter(int i) throws HyracksDataException {
+ RunFileWriter writer = state.getRunWriters()[i];
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ private void write(int i, ByteBuffer head) throws HyracksDataException {
+ RunFileWriter writer = state.getRunWriters()[i];
+ if (writer == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ GraceHashJoinOperatorDescriptor.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ state.getRunWriters()[i] = writer;
+ }
+ writer.nextFrame(head);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor0.reset(buffer);
+ int tCount = accessor0.getTupleCount();
+ for (int i = 0; i < tCount; ++i) {
+
+ int entry = hpc.partition(accessor0, i, numPartitions);
+ ByteBuffer outbuf = outbufs[entry];
+ appender.reset(outbuf, false);
+ if (!appender.append(accessor0, i)) {
+ // buffer is full, ie. we cannot fit the tuple
+ // into the buffer -- write it to disk
+ write(entry, outbuf);
+ outbuf.clear();
+ appender.reset(outbuf, true);
+ if (!appender.append(accessor0, i)) {
+ throw new HyracksDataException("Item too big to fit in frame");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId);
+ outbufs = new ByteBuffer[numPartitions];
+ state.setRunWriters(new RunFileWriter[numPartitions]);
+ for (int i = 0; i < numPartitions; i++) {
+ outbufs[i] = ctx.allocateFrame();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
new file mode 100644
index 0000000..906042e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class GraceHashJoinPartitionState extends AbstractStateObject {
+ private RunFileWriter[] fWriters;
+
+ public GraceHashJoinPartitionState(JobId jobId, Object id) {
+ super(jobId, id);
+ }
+
+ public RunFileWriter[] getRunWriters() {
+ return fWriters;
+ }
+
+ public void setRunWriters(RunFileWriter[] fWriters) {
+ this.fWriters = fWriters;
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 99f0020..2521830 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -47,7 +47,7 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -138,7 +138,7 @@
builder.addTargetEdge(0, phase2, 0);
}
- public static class BuildAndPartitionTaskState extends AbstractTaskState {
+ public static class BuildAndPartitionTaskState extends AbstractStateObject {
private RunFileWriter[] fWriters;
private InMemoryHashJoin joiner;
private int nPartitions;
@@ -211,7 +211,7 @@
closeWriter(i);
}
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -390,7 +390,7 @@
@Override
public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
buildWriters = state.fWriters;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 61fc0b3..89ae09e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -41,7 +41,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -105,7 +105,7 @@
builder.addBlockingEdge(hba, hpa);
}
- public static class HashBuildTaskState extends AbstractTaskState {
+ public static class HashBuildTaskState extends AbstractStateObject {
private InMemoryHashJoin joiner;
public HashBuildTaskState() {
@@ -178,7 +178,7 @@
@Override
public void close() throws HyracksDataException {
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -204,7 +204,7 @@
@Override
public void open() throws HyracksDataException {
- state = (HashBuildTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
BUILD_ACTIVITY_ID), partition));
writer.open();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 21a828c..7082948 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -36,7 +36,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -72,7 +72,7 @@
builder.addBlockingEdge(jc, nlj);
}
- public static class JoinCacheTaskState extends AbstractTaskState {
+ public static class JoinCacheTaskState extends AbstractStateObject {
private NestedLoopJoin joiner;
public JoinCacheTaskState() {
@@ -129,7 +129,7 @@
@Override
public void close() throws HyracksDataException {
state.joiner.closeCache();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -156,7 +156,7 @@
@Override
public void open() throws HyracksDataException {
- state = (JoinCacheTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ state = (JoinCacheTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
JOIN_CACHE_ACTIVITY_ID), partition));
writer.open();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 87e2b2c..fd91b66 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -47,7 +47,7 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -201,7 +201,7 @@
return numberOfPartitions;
}
- public static class BuildAndPartitionTaskState extends AbstractTaskState {
+ public static class BuildAndPartitionTaskState extends AbstractStateObject {
private int memForJoin;
private int numOfPartitions;
@@ -290,7 +290,7 @@
@Override
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -353,7 +353,7 @@
@Override
public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 83160e5..993ff27 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -34,7 +34,7 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -63,7 +63,7 @@
builder.addBlockingEdge(ma, ra);
}
- public static class MaterializerTaskState extends AbstractTaskState {
+ public static class MaterializerTaskState extends AbstractStateObject {
private RunFileWriter out;
public MaterializerTaskState() {
@@ -115,7 +115,7 @@
@Override
public void close() throws HyracksDataException {
state.out.close();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -139,7 +139,7 @@
@Override
public void initialize() throws HyracksDataException {
ByteBuffer frame = ctx.allocateFrame();
- MaterializerTaskState state = (MaterializerTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+ MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
RunFileReader in = state.out.createReader();
writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 3e99000..b2f516b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -32,7 +32,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
@@ -40,7 +40,7 @@
private static final int COLLECT_ACTIVITY_ID = 0;
private static final int SPLIT_ACTIVITY_ID = 1;
- public static class CollectTaskState extends AbstractTaskState {
+ public static class CollectTaskState extends AbstractStateObject {
private ArrayList<Object[]> buffer;
public CollectTaskState() {
@@ -91,7 +91,7 @@
@Override
public void close() throws HyracksDataException {
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -134,7 +134,7 @@
@Override
public void open() throws HyracksDataException {
- state = (CollectTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ state = (CollectTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
COLLECT_ACTIVITY_ID), partition));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 0effefb..db3edaa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -36,7 +36,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -84,7 +84,7 @@
builder.addBlockingEdge(sa, ma);
}
- public static class SortTaskState extends AbstractTaskState {
+ public static class SortTaskState extends AbstractStateObject {
private List<IFrameReader> runs;
private FrameSorter frameSorter;
@@ -138,7 +138,7 @@
runGen.close();
state.runs = runGen.getRuns();
state.frameSorter = runGen.getFrameSorter();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -163,7 +163,7 @@
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- SortTaskState state = (SortTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
SORT_ACTIVITY_ID), partition));
List<IFrameReader> runs = state.runs;
FrameSorter frameSorter = state.frameSorter;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 30bebe9..2689375 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -33,7 +33,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -76,7 +76,7 @@
builder.addBlockingEdge(sa, ma);
}
- public static class SortTaskState extends AbstractTaskState {
+ public static class SortTaskState extends AbstractStateObject {
private FrameSorter frameSorter;
public SortTaskState() {
@@ -124,7 +124,7 @@
@Override
public void close() throws HyracksDataException {
state.frameSorter.sortFrames();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@Override
@@ -150,7 +150,7 @@
public void initialize() throws HyracksDataException {
writer.open();
try {
- SortTaskState state = (SortTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+ SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), SORT_ACTIVITY_ID), partition));
state.frameSorter.flushFrames(writer);
} catch (Exception e) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index a8653d1..1b9d2fe 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -36,7 +36,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -107,7 +107,7 @@
builder.addBlockingEdge(osa, oma);
}
- public static class OptimizedSortTaskState extends AbstractTaskState {
+ public static class OptimizedSortTaskState extends AbstractStateObject {
private List<IFrameReader> runs;
public OptimizedSortTaskState() {
@@ -165,7 +165,7 @@
new TaskId(getActivityId(), partition));
runGen.close();
state.runs = runGen.getRuns();
- ctx.setTaskState(state);
+ ctx.setStateObject(state);
}
@@ -191,7 +191,7 @@
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+ OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), SORT_ACTIVITY_ID), partition));
List<IFrameReader> runs = state.runs;
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index c2d3ebf..1ee4400 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -39,8 +40,8 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -74,7 +75,9 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -125,7 +128,9 @@
LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
index 2f27bf0..26e335e 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -16,6 +16,16 @@
import java.nio.ByteBuffer;
+/**
+ * Accepts buffers.
+ *
+ * @author vinayakb
+ */
public interface IBufferAcceptor {
+ /**
+ * Accept a buffer.
+ *
+ * @param buffer
+ */
public void accept(ByteBuffer buffer);
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
index c395ac9..5e9bf02 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -14,8 +14,23 @@
*/
package edu.uci.ics.hyracks.net.buffers;
+/**
+ * A buffer acceptor that can be closed to indicate end of transmission or an error code
+ * specified to indicate an error in transmission.
+ *
+ * @author vinayakb
+ */
public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+ /**
+ * Close the buffer acceptor.
+ */
public void close();
+ /**
+ * Indicate that an error occurred.
+ *
+ * @param ecode
+ * - the error code.
+ */
public void error(int ecode);
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 4b55d4b..63cb6b5 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -28,6 +28,11 @@
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
+/**
+ * Handle to a channel that represents a logical full-duplex communication end-point.
+ *
+ * @author vinayakb
+ */
public class ChannelControlBlock {
private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
@@ -45,6 +50,8 @@
private final AtomicBoolean remoteClose;
+ private final AtomicBoolean remoteCloseAck;
+
ChannelControlBlock(ChannelSet cSet, int channelId) {
this.cSet = cSet;
this.channelId = channelId;
@@ -53,16 +60,27 @@
localClose = new AtomicBoolean();
localCloseAck = new AtomicBoolean();
remoteClose = new AtomicBoolean();
+ remoteCloseAck = new AtomicBoolean();
}
int getChannelId() {
return channelId;
}
+ /**
+ * Get the read inderface of this channel.
+ *
+ * @return the read interface.
+ */
public IChannelReadInterface getReadInterface() {
return ri;
}
+ /**
+ * Get the write interface of this channel.
+ *
+ * @return the write interface.
+ */
public IChannelWriteInterface getWriteInterface() {
return wi;
}
@@ -321,6 +339,10 @@
remoteClose.set(true);
}
+ void reportRemoteEOSAck() {
+ remoteCloseAck.set(true);
+ }
+
boolean getRemoteEOS() {
return remoteClose.get();
}
@@ -336,12 +358,13 @@
}
boolean completelyClosed() {
- return localCloseAck.get() && remoteClose.get();
+ return localCloseAck.get() && remoteCloseAck.get();
}
@Override
public String toString() {
return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
- + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+ + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: " + ri.credits
+ + " writeCredits: " + wi.credits + "]";
}
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
index 0fc9b2a..feb605f 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -14,6 +14,17 @@
*/
package edu.uci.ics.hyracks.net.protocols.muxdemux;
+/**
+ * Callback interface to report opening of channels.
+ *
+ * @author vinayakb
+ */
public interface IChannelOpenListener {
+ /**
+ * Indicates that a remote endpoint has opened a channel to this receiver.
+ *
+ * @param channel
+ * - The newly opened channel.
+ */
public void channelOpened(ChannelControlBlock channel);
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
index 468a617..178ad78 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -17,8 +17,26 @@
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+/**
+ * Represents the read interface of a {@link ChannelControlBlock}.
+ *
+ * @author vinayakb
+ */
public interface IChannelReadInterface {
+ /**
+ * Set the callback that will be invoked by the network layer when a buffer has been
+ * filled with data received from the remote side.
+ *
+ * @param fullBufferAcceptor
+ * - the full buffer acceptor.
+ */
public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
+ /**
+ * Get the acceptor that collects empty buffers when the client has finished consuming
+ * a previously full buffer.
+ *
+ * @return the empty buffer acceptor.
+ */
public IBufferAcceptor getEmptyBufferAcceptor();
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
index 1e53d71..8e71246 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -17,8 +17,26 @@
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+/**
+ * Represents the write interface of a {@link ChannelControlBlock}.
+ *
+ * @author vinayakb
+ */
public interface IChannelWriteInterface {
+ /**
+ * Set the callback interface that must be invoked when a full buffer has been emptied by
+ * writing the data to the remote end.
+ *
+ * @param emptyBufferAcceptor
+ * - the empty buffer acceptor.
+ */
public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+ /**
+ * Get the full buffer acceptor that accepts buffers filled with data that need to be written
+ * to the remote end.
+ *
+ * @return the full buffer acceptor.
+ */
public ICloseableBufferAcceptor getFullBufferAcceptor();
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e717c45..3772df3 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -26,6 +26,12 @@
import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
+/**
+ * A {@link MultiplexedConnection} can be used by clients to create multiple "channels"
+ * that can have independent full-duplex conversations.
+ *
+ * @author vinayakb
+ */
public class MultiplexedConnection implements ITCPConnectionEventListener {
private static final Logger LOGGER = Logger.getLogger(MultiplexedConnection.class.getName());
@@ -51,7 +57,7 @@
private Exception error;
- public MultiplexedConnection(MuxDemux muxDemux) {
+ MultiplexedConnection(MuxDemux muxDemux) {
this.muxDemux = muxDemux;
pendingWriteEventsCounter = new IEventCounter() {
private int counter;
@@ -128,7 +134,14 @@
cSet.notifyIOError();
}
- public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+ /**
+ * Open a channel to the other side.
+ *
+ * @return
+ * @throws NetException
+ * - A network failure occurred.
+ */
+ public ChannelControlBlock openChannel() throws NetException {
synchronized (this) {
if (connectionFailure) {
throw new NetException(error);
@@ -264,6 +277,8 @@
BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
pendingEOSAckBitmap.clear(j);
+ ChannelControlBlock ccb = cSet.getCCB(j);
+ ccb.reportRemoteEOSAck();
writerState.command.setChannelId(j);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
writerState.command.setData(0);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 8548bb8..22a57f8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -24,6 +24,13 @@
import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
+/**
+ * Multiplexed Connection Manager.
+ * Every participant that wants to use the multiplexed connections must create and instance
+ * of this class.
+ *
+ * @author vinayakb
+ */
public class MuxDemux {
private final InetSocketAddress localAddress;
@@ -35,6 +42,16 @@
private final MuxDemuxPerformanceCounters perfCounters;
+ /**
+ * Constructor.
+ *
+ * @param localAddress
+ * - TCP/IP socket address to listen on
+ * @param listener
+ * - Callback interface to report channel events
+ * @param nThreads
+ * - Number of threads to use for data transfer
+ */
public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
this.localAddress = localAddress;
this.channelOpenListener = listener;
@@ -80,10 +97,27 @@
perfCounters = new MuxDemuxPerformanceCounters();
}
+ /**
+ * Starts listening for remote connections and is capable of initiating connections.
+ *
+ * @throws IOException
+ */
public void start() throws IOException {
tcpEndpoint.start(localAddress);
}
+ /**
+ * Create a {@link MultiplexedConnection} that can create channels to the specified remote address.
+ * The remote address must have a {@link MuxDemux} listening for connections.
+ *
+ * @param remoteAddress
+ * - Address of the remote {@link MuxDemux}
+ * @return a {@link MultiplexedConnection} connected to the remote address.
+ * @throws InterruptedException
+ * - This call was interrupted while waiting for connection setup.
+ * In such an event, it is safe to retry the {@link #connect(InetSocketAddress)} call.
+ * @throws NetException
+ */
public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
MultiplexedConnection mConn = null;
synchronized (this) {
@@ -102,10 +136,20 @@
return channelOpenListener;
}
+ /**
+ * Get the local address that this {@link MuxDemux} is listening for connections.
+ *
+ * @return local TCP/IP socket address.
+ */
public InetSocketAddress getLocalAddress() {
return tcpEndpoint.getLocalAddress();
}
+ /**
+ * Gets performance counters useful for collecting efficiency metrics.
+ *
+ * @return
+ */
public MuxDemuxPerformanceCounters getPerformanceCounters() {
return perfCounters;
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index e1743a1..b776f55 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -19,8 +19,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -92,12 +91,12 @@
}
@Override
- public void setTaskState(ITaskState taskState) {
+ public void setStateObject(IStateObject taskState) {
}
@Override
- public ITaskState getTaskState(TaskId taskId) {
+ public IStateObject getStateObject(Object id) {
return null;
}
}
\ No newline at end of file