[ASTERIXDB-2688][HYR] Fix use of a Hyracks task across join stages
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
In hash join, a task from the build stage is being used in the probe stage.
This is a problem since such tasks have already finished and notified the CC they are done.
One observed issue is related to issuing a warning in the probe phase where some warnings are
not reported because they are issued to tasks that have finished (the way this happened is that
a comparator was created in the build phase using the build-phase task. Then, this comparator
was used in the probe phase and issued a warning).
- make IHyracksJobletContext extend IHyracksCommonContext so that it is also a frame manager context
- make activites of join operators use the joblet context instead of the task context for acquiring buffers
- create the probe-to-build comparator in the probe phase so that the right task is used in the comparator
Change-Id: I38a4a779b9620494f15606162f0f1e9487fd0984
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4563
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 7a59926..41fed25 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
-public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry, IHyracksCommonContext {
INCServiceContext getServiceContext();
JobId getJobId();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 35cf57f..02cd184 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -240,11 +240,13 @@
nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
}
- ByteBuffer allocateFrame() throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame() throws HyracksDataException {
return frameManager.allocateFrame();
}
- ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
if (serviceCtx.getMemoryManager().allocate(bytes)) {
memoryAllocation.addAndGet(bytes);
return frameManager.allocateFrame(bytes);
@@ -252,18 +254,21 @@
throw new HyracksDataException("Unable to allocate frame: Not enough memory");
}
- ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
+ @Override
+ public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData)
throws HyracksDataException {
return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData);
}
- void deallocateFrames(int bytes) {
+ @Override
+ public void deallocateFrames(int bytes) {
memoryAllocation.addAndGet(bytes);
serviceCtx.getMemoryManager().deallocate(bytes);
frameManager.deallocateFrames(bytes);
}
- public final int getFrameSize() {
+ @Override
+ public final int getInitialFrameSize() {
return frameManager.getInitialFrameSize();
}
@@ -271,7 +276,8 @@
return maxWarnings;
}
- public IIOManager getIOManager() {
+ @Override
+ public IIOManager getIoManager() {
return serviceCtx.getIoManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e58e4a4..158e24e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -139,7 +139,7 @@
this.taskAttemptId = taskId;
this.displayName = displayName;
this.executorService = executor;
- fileFactory = new WorkspaceFileFactory(this, joblet.getIOManager());
+ fileFactory = new WorkspaceFileFactory(this, joblet.getIoManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<>();
opEnv = joblet.getEnvironment();
@@ -181,12 +181,12 @@
@Override
public int getInitialFrameSize() {
- return joblet.getFrameSize();
+ return joblet.getInitialFrameSize();
}
@Override
public IIOManager getIoManager() {
- return joblet.getIOManager();
+ return joblet.getIoManager();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index eadbcf7..fb2d4e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -49,35 +49,35 @@
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
- private IFrameTupleAccessor accessorProbe;
+ private final IFrameTupleAccessor accessorProbe;
private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
- private final ITuplePairComparator tpComparator;
+ private ITuplePairComparator tpComparator;
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuild;
private final ISerializableTable table;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
private final IPredicateEvaluator predEvaluator;
- private TupleInFrameListAccessor tupleAccessor;
+ private final TupleInFrameListAccessor tupleAccessor;
// To release frames
- ISimpleFrameBufferManager bufferManager;
+ private final ISimpleFrameBufferManager bufferManager;
private final boolean isTableCapacityNotZero;
private static final Logger LOGGER = LogManager.getLogger();
- public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
- FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
- ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+ public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+ ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+ ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
throws HyracksDataException {
- this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
- missingWritersBuild, table, predEval, false, bufferManager);
+ this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
+ predEval, false, bufferManager);
}
- public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe,
- FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
- ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
+ public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
+ ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+ ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
this.table = table;
@@ -88,7 +88,6 @@
this.accessorProbe = accessorProbe;
this.tpcProbe = tpcProbe;
appender = new FrameTupleAppender(new VSizeFrame(ctx));
- tpComparator = comparator;
predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
@@ -105,11 +104,7 @@
reverseOutputOrder = reverse;
this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
this.bufferManager = bufferManager;
- if (table.getTableSize() != 0) {
- isTableCapacityNotZero = true;
- } else {
- isTableCapacityNotZero = false;
- }
+ this.isTableCapacityNotZero = table.getTableSize() != 0;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("InMemoryHashJoin has been created for a table size of " + table.getTableSize()
+ " for Thread ID " + Thread.currentThread().getId() + ".");
@@ -126,6 +121,7 @@
storedTuplePointer.reset(bIndex, i);
// If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
if (!table.insert(entry, storedTuplePointer)) {
+ // TODO(ali): should check if insertion failed even after compaction and take action
compactTableAndInsertAgain(entry, storedTuplePointer);
}
}
@@ -152,6 +148,15 @@
}
/**
+ * Must be called before starting to join to set the right comparator with the right context.
+ *
+ * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+ */
+ void setComparator(ITuplePairComparator comparator) {
+ tpComparator = comparator;
+ }
+
+ /**
* Reads the given tuple from the probe side and joins it with tuples from the build side.
* This method assumes that the accessorProbe is already set to the current probe frame.
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ccca62d..33976a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -34,7 +35,6 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -74,18 +74,8 @@
IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
- super(spec, 2, 1);
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories0 = hashFunctionFactories0;
- this.hashFunctionFactories1 = hashFunctionFactories1;
- this.comparatorFactory = comparatorFactory;
- this.predEvaluatorFactory = predEvalFactory;
- outRecDescs[0] = recordDescriptor;
- this.isLeftOuter = false;
- this.nonMatchWriterFactories = null;
- this.tableSize = tableSize;
- this.memSizeInFrames = memSizeInFrames;
+ this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
+ recordDescriptor, false, null, tableSize, memSizeInFrames);
}
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
@@ -100,7 +90,7 @@
this.hashFunctionFactories1 = hashFunctionFactories1;
this.comparatorFactory = comparatorFactory;
this.predEvaluatorFactory = predEvalFactory;
- outRecDescs[0] = recordDescriptor;
+ this.outRecDescs[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = missingWriterFactories1;
this.tableSize = tableSize;
@@ -125,11 +115,8 @@
builder.addBlockingEdge(hba, hpa);
}
- public static class HashBuildTaskState extends AbstractStateObject {
- private InMemoryHashJoin joiner;
-
- public HashBuildTaskState() {
- }
+ static class HashBuildTaskState extends AbstractStateObject {
+ InMemoryHashJoin joiner;
private HashBuildTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
@@ -160,21 +147,23 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
+ final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
- final IMissingWriter[] nullWriters1 =
- isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+ final IMissingWriter[] nullWriters1;
if (isLeftOuter) {
+ nullWriters1 = new IMissingWriter[nonMatchWriterFactories.length];
for (int i = 0; i < nonMatchWriterFactories.length; i++) {
nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter();
}
+ } else {
+ nullWriters1 = null;
}
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
- final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
- final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+ final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
+ final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -186,12 +175,11 @@
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories0).createPartitioner(ctx);
ITuplePartitionComputer hpc1 =
new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories1).createPartitioner(ctx);
- state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
- ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
- state.joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0,
- new FrameTupleAccessor(rd1), rd1, hpc1, comparator, isLeftOuter, nullWriters1, table,
- predEvaluator, bufferManager);
+ state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+ ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
+ state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
+ new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
+ bufferManager);
}
@Override
@@ -250,6 +238,7 @@
writer.open();
state = (HashBuildTaskState) ctx
.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
+ state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index d0f5a73..361d1ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
@@ -46,7 +46,7 @@
private final FrameTupleAccessor accessorInner;
private final FrameTupleAccessor accessorOuter;
private final FrameTupleAppender appender;
- private final ITuplePairComparator tpComparator;
+ private ITuplePairComparator tpComparator;
private final IFrame outBuffer;
private final IFrame innerBuffer;
private final VariableFrameMemoryManager outerBufferMngr;
@@ -55,24 +55,23 @@
private final ArrayTupleBuilder missingTupleBuilder;
private final IPredicateEvaluator predEvaluator;
private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
- private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+ private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
- public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
- ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+ public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+ FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
this.accessorInner = accessorInner;
this.accessorOuter = accessorOuter;
this.appender = new FrameTupleAppender();
- this.tpComparator = comparatorsOuter2Inner;
- this.outBuffer = new VSizeFrame(ctx);
- this.innerBuffer = new VSizeFrame(ctx);
+ this.outBuffer = new VSizeFrame(jobletContext);
+ this.innerBuffer = new VSizeFrame(jobletContext);
this.appender.reset(outBuffer, true);
if (memSize < 3) {
throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
}
- this.outerBufferMngr =
- new VariableFrameMemoryManager(new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)),
- FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+ this.outerBufferMngr = new VariableFrameMemoryManager(
+ new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)),
+ FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
this.predEvaluator = predEval;
this.isReversed = false;
@@ -91,8 +90,8 @@
}
FileReference file =
- ctx.getJobletContext().createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
- runFileWriter = new RunFileWriter(file, ctx.getIoManager());
+ jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, jobletContext.getIoManager());
runFileWriter.open();
}
@@ -100,6 +99,15 @@
runFileWriter.nextFrame(buffer);
}
+ /**
+ * Must be called before starting to join to set the right comparator with the right context.
+ *
+ * @param comparator the comparator to use for comparing the probe tuples against the build tuples
+ */
+ void setComparator(ITuplePairComparator comparator) {
+ tpComparator = comparator;
+ }
+
public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
RunFileReader runFileReader = runFileWriter.createReader();
@@ -131,7 +139,7 @@
for (int i = 0; i < tupleCount0; ++i) {
boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
- int c = compare(accessorOuter, i, accessorInner, j);
+ int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
@@ -195,15 +203,6 @@
outerBufferMngr.reset();
}
- private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
- throws HyracksDataException {
- int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
-
public void setIsReversed(boolean b) {
this.isReversed = b;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 2236056..1de8094 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -31,7 +32,6 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,9 +114,9 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
+ final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
@@ -132,17 +132,15 @@
@Override
public void open() throws HyracksDataException {
- state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
-
- state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
- comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
+ state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
+ state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
+ new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+ ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity());
FrameUtils.copyAndFlip(buffer, copyBuffer);
state.joiner.cache(copyBuffer);
}
@@ -180,6 +178,7 @@
writer.open();
state = (JoinCacheTaskState) ctx.getStateObject(
new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
+ state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 7b6dcdb..9c28c61 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
@@ -62,11 +62,10 @@
PROBE
}
- private final IHyracksTaskContext ctx;
+ private final IHyracksJobletContext jobletCtx;
private final String buildRelName;
private final String probeRelName;
- private final ITuplePairComparator comparator;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
private final RecordDescriptor buildRd;
@@ -95,17 +94,16 @@
private final TuplePointer tempPtr = new TuplePointer();
private int[] probePSizeInTups;
- public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
- String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
- RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
- IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
- this.ctx = ctx;
+ public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
+ String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
+ ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+ boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+ this.jobletCtx = jobletCtx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
this.probeRd = probeRd;
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
- this.comparator = comparator;
this.buildRelName = buildRelName;
this.probeRelName = probeRelName;
this.numOfPartitions = numOfPartitions;
@@ -127,7 +125,7 @@
public void initBuild() throws HyracksDataException {
IDeallocatableFramePool framePool =
- new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+ new DeallocatableFramePool(jobletCtx, memSizeInFrames * jobletCtx.getInitialFrameSize());
bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
bufferManager = new VPartitionTupleBufferManager(
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
@@ -177,8 +175,8 @@
int pid) throws HyracksDataException {
RunFileWriter writer = runFileWriters[pid];
if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIoManager());
+ FileReference file = jobletCtx.createManagedWorkspaceFile(refName);
+ writer = new RunFileWriter(file, jobletCtx.getIoManager());
writer.open();
runFileWriters[pid] = writer;
}
@@ -194,10 +192,10 @@
// and tries to bring back as many spilled partitions as possible if there is free space.
int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
- ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
- this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
- predEvaluator, isReversed, bufferManagerForHashTable);
+ ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
+ this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
+ new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
+ isReversed, bufferManagerForHashTable);
buildHashTable();
}
@@ -250,7 +248,7 @@
* @throws HyracksDataException
*/
private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
int inMemTupCount = 0;
@@ -356,7 +354,7 @@
* @return partition id of selected partition to reload
*/
private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
// Add one frame to freeSpace to consider the one frame reserved for the spilled partition
long totalFreeSpace = freeSpace + frameSize;
if (totalFreeSpace > 0) {
@@ -379,7 +377,7 @@
try {
r.open();
if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
+ reloadBuffer = new VSizeFrame(jobletCtx);
}
while (r.nextFrame(reloadBuffer)) {
accessorBuild.reset(reloadBuffer.getBuffer());
@@ -430,8 +428,9 @@
}
}
- public void initProbe() {
+ public void initProbe(ITuplePairComparator comparator) {
probePSizeInTups = new int[numOfPartitions];
+ inMemJoiner.setComparator(comparator);
}
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -464,7 +463,7 @@
VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
// If the partition is at least half-full and insertion fails, that partition is preferred to get
// spilled, otherwise the biggest partition gets chosen as the victim.
- boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+ boolean modestCase = recordSize <= (jobletCtx.getInitialFrameSize() / 2);
int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
: spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
// This method is called for the spilled partitions, so we know that this tuple is going to get written to
@@ -492,7 +491,7 @@
private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
throws HyracksDataException {
if (bigProbeFrameAppender == null) {
- bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
}
RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
if (!bigProbeFrameAppender.append(accessorProbe, i)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 45cccec..1819b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -190,7 +191,7 @@
}
//memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
- private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+ private static int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
throws HyracksDataException {
int numberOfPartitions = 0;
if (memorySize <= 2) {
@@ -260,8 +261,6 @@
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
- final ITuplePairComparator probComparator =
- tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator =
(predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
@@ -284,9 +283,9 @@
state.memForJoin = memSizeInFrames - 2;
state.numOfPartitions =
getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
- PROBE_REL, BUILD_REL, probComparator, probeRd, buildRd, probeHpc, buildHpc, predEvaluator,
- isLeftOuter, nonMatchWriterFactories);
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
+ state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
+ predEvaluator, isLeftOuter, nonMatchWriterFactories);
state.hybridHJ.initBuild();
if (LOGGER.isTraceEnabled()) {
@@ -373,8 +372,9 @@
}
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
private BuildAndPartitionTaskState state;
- private IFrame rPartbuff = new VSizeFrame(ctx);
+ private IFrame rPartbuff = new VSizeFrame(jobletCtx);
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
@@ -386,7 +386,7 @@
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- state.hybridHJ.initProbe();
+ state.hybridHJ.initProbe(probComp);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -480,7 +480,7 @@
new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
.createPartitioner(level);
- int frameSize = ctx.getInitialFrameSize();
+ int frameSize = jobletCtx.getInitialFrameSize();
long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
@@ -575,7 +575,7 @@
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
- rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, comp, probeRd,
+ rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
@@ -598,7 +598,7 @@
probeSideReader.open();
rPartbuff.reset();
try {
- rHHj.initProbe();
+ rHHj.initProbe(comp);
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
@@ -696,7 +696,7 @@
private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
if (nullResultAppender == null) {
- nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
}
if (probeTupleAccessor == null) {
probeTupleAccessor = new FrameTupleAccessor(probeRd);
@@ -725,14 +725,14 @@
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
IDeallocatableFramePool framePool =
- new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
+ new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
- ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe,
- new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, comp, isLeftOuter,
+ ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
+ hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
-
+ joiner.setComparator(comp);
try {
bReader.open();
rPartbuff.reset();
@@ -788,12 +788,12 @@
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
- NestedLoopJoin nlj =
- new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
- nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
+ NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
+ new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
nlj.setIsReversed(isReversed);
+ nlj.setComparator(nljComptorOuterInner);
- IFrame cacheBuff = new VSizeFrame(ctx);
+ IFrame cacheBuff = new VSizeFrame(jobletCtx);
try {
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -808,7 +808,7 @@
}
}
try {
- IFrame joinBuff = new VSizeFrame(ctx);
+ IFrame joinBuff = new VSizeFrame(jobletCtx);
outerReader.open();
try {
while (outerReader.nextFrame(joinBuff)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 4c6b70f..93739df 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -50,7 +50,7 @@
public class OptimizedHybridHashJoinTest {
int frameSize = 32768;
int totalNumberOfFrames = 10;
- IHyracksTaskContext ctx = TestUtils.create(frameSize);
+ IHyracksJobletContext ctx = TestUtils.create(frameSize).getJobletContext();
OptimizedHybridHashJoin hhj;
static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
@@ -150,8 +150,8 @@
private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
- hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
- probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+ hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
+ buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
hhj.initBuild();
@@ -184,7 +184,7 @@
//to the in memory joiner. As such, only next frame is important.
}
};
- hhj.initProbe();
+ hhj.initProbe(comparator);
for (int i = 0; i < totalNumberOfFrames; i++) {
hhj.probe(frame.getBuffer(), writer);
checkOneFrameReservedPerSpilledPartitions();
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 2809343..c485b32 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -34,31 +34,33 @@
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
public class TestJobletContext implements IHyracksJobletContext {
- private final int frameSize;
+
private final INCServiceContext serviceContext;
private final FrameManager frameManger;
- private JobId jobId;
- private WorkspaceFileFactory fileFactory;
+ private final JobId jobId;
+ private final WorkspaceFileFactory fileFactory;
private final long jobStartTime;
- public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
- this.frameSize = frameSize;
+ TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
this.serviceContext = serviceContext;
this.jobId = jobId;
- fileFactory = new WorkspaceFileFactory(this, getIOManager());
+ fileFactory = new WorkspaceFileFactory(this, getIoManager());
this.frameManger = new FrameManager(frameSize);
this.jobStartTime = System.currentTimeMillis();
}
- ByteBuffer allocateFrame() throws HyracksDataException {
+ @Override
+ public ByteBuffer allocateFrame() throws HyracksDataException {
return frameManger.allocateFrame();
}
+ @Override
public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
return frameManger.allocateFrame(bytes);
}
- ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
+ @Override
+ public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
throws HyracksDataException {
return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData);
}
@@ -67,15 +69,18 @@
return null;
}
- void deallocateFrames(int bytes) {
+ @Override
+ public void deallocateFrames(int bytes) {
frameManger.deallocateFrames(bytes);
}
- public int getFrameSize() {
- return frameSize;
+ @Override
+ public final int getInitialFrameSize() {
+ return frameManger.getInitialFrameSize();
}
- public IIOManager getIOManager() {
+ @Override
+ public IIOManager getIoManager() {
return serviceContext.getIoManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 7c602f5..dcb85f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -85,12 +85,12 @@
@Override
public int getInitialFrameSize() {
- return jobletContext.getFrameSize();
+ return jobletContext.getInitialFrameSize();
}
@Override
public IIOManager getIoManager() {
- return jobletContext.getIOManager();
+ return jobletContext.getIoManager();
}
@Override