[NO ISSUE][RT] Window operator performance improvement
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Improve performance of window operator: save only
last values of partitioning columns from the
previous frame instead of the whole frame
Change-Id: Ib8d6b3b99ee24c73b76fd118040ed6972e6798d9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3346
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index c8f7cbf..d6b2585 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -78,6 +78,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
index 85585a0..e16ee13 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -28,11 +28,10 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
/**
@@ -144,20 +143,11 @@
}
}
- static PointableTupleReference createPointables(int ln) {
- IPointable[] pointables = new IPointable[ln];
- for (int i = 0; i < ln; i++) {
- pointables[i] = VoidPointable.FACTORY.createPointable();
- }
- return new PointableTupleReference(pointables);
- }
-
- static boolean allTrue(PointableTupleReference tupleRef, IBinaryBooleanInspector boolAccessor)
- throws HyracksDataException {
- for (int i = 0, ln = tupleRef.getFieldCount(); i < ln; i++) {
- IPointable field = tupleRef.getField(i);
- boolean b = boolAccessor.getBooleanValue(field.getByteArray(), field.getStartOffset(), field.getLength());
- if (!b) {
+ static boolean allTrue(ITupleReference tupleRef, IBinaryBooleanInspector boolAccessor) throws HyracksDataException {
+ for (int i = 0, n = tupleRef.getFieldCount(); i < n; i++) {
+ boolean v = boolAccessor.getBooleanValue(tupleRef.getFieldData(i), tupleRef.getFieldStart(i),
+ tupleRef.getFieldLength(i));
+ if (!v) {
return false;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
index 9cc25d0..0afe212 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
@@ -24,16 +24,17 @@
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
import org.apache.hyracks.algebricks.runtime.operators.aggrun.AbstractRunningAggregatePushRuntime;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
@@ -42,9 +43,9 @@
private final IBinaryComparatorFactory[] partitionComparatorFactories;
private IBinaryComparator[] partitionComparators;
private final IBinaryComparatorFactory[] orderComparatorFactories;
- private IFrame copyFrame;
- private FrameTupleAccessor copyFrameAccessor;
private FrameTupleAccessor frameAccessor;
+ private FrameTupleReference partitionColumnsRef;
+ private PointableTupleReference partitionColumnsPrevCopy;
private long frameId;
private boolean inPartition;
@@ -60,7 +61,8 @@
}
/**
- * Number of frames reserved by this operator: {@link #frame}, {@link #copyFrame}
+ * Number of frames reserved by this operator: {@link #frame} + conservative estimate for
+ * {@link #partitionColumnsPrevCopy}
*/
int getReservedFrameCount() {
return 2;
@@ -78,9 +80,9 @@
super.init();
partitionComparators = createBinaryComparators(partitionComparatorFactories);
frameAccessor = new FrameTupleAccessor(inputRecordDesc);
- copyFrame = new VSizeFrame(ctx);
- copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc);
- copyFrameAccessor.reset(copyFrame.getBuffer());
+ partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
+ partitionColumnsPrevCopy =
+ PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
IBinaryComparator[] orderComparators = createBinaryComparators(orderComparatorFactories);
for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) {
runningAggEval.configure(orderComparators);
@@ -106,18 +108,19 @@
if (frameId == 0) {
beginPartition();
} else {
- boolean samePartition = PreclusteredGroupWriter.sameGroup(copyFrameAccessor,
- copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, partitionColumns, partitionComparators);
+ partitionColumnsRef.reset(frameAccessor, 0);
+ boolean samePartition = PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, partitionColumnsRef,
+ partitionComparators);
if (!samePartition) {
endPartition();
beginPartition();
}
}
- if (nTuple == 1) {
+ int tLastIndex = nTuple - 1;
+ if (tLastIndex == 0) {
partitionChunk(frameId, buffer, 0, 0);
} else {
int tBeginIndex = 0;
- int tLastIndex = nTuple - 1;
for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) {
boolean samePartition = PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor,
tIndex, partitionColumns, partitionComparators);
@@ -131,9 +134,8 @@
partitionChunk(frameId, buffer, tBeginIndex, tLastIndex);
}
- copyFrame.resize(buffer.capacity());
- FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
- copyFrameAccessor.reset(copyFrame.getBuffer());
+ partitionColumnsRef.reset(frameAccessor, tLastIndex);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
frameId++;
}
@@ -167,7 +169,7 @@
}
}
- protected static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) {
+ static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) {
IBinaryComparator[] comparators = new IBinaryComparator[factories.length];
for (int i = 0; i < factories.length; i++) {
comparators[i] = factories[i].createBinaryComparator();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index cb86c5f..1a57e27 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -188,28 +188,33 @@
if (frameValueExists) {
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
- frameValuePointables = createPointables(frameValueEvalFactories.length);
+ frameValuePointables =
+ PointableTupleReference.create(frameValueEvalFactories.length, VoidPointable.FACTORY);
}
if (frameStartExists) {
frameStartEvals = createEvaluators(frameStartEvalFactories, ctx);
- frameStartPointables = createPointables(frameStartEvalFactories.length);
+ frameStartPointables =
+ PointableTupleReference.create(frameStartEvalFactories.length, VoidPointable.FACTORY);
}
if (frameStartValidationExists) {
frameStartValidationEvals = createEvaluators(frameStartValidationEvalFactories, ctx);
- frameStartValidationPointables = createPointables(frameStartValidationEvalFactories.length);
+ frameStartValidationPointables =
+ PointableTupleReference.create(frameStartValidationEvalFactories.length, VoidPointable.FACTORY);
}
if (frameEndExists) {
frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
- frameEndPointables = createPointables(frameEndEvalFactories.length);
+ frameEndPointables = PointableTupleReference.create(frameEndEvalFactories.length, VoidPointable.FACTORY);
}
if (frameEndValidationExists) {
frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
- frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+ frameEndValidationPointables =
+ PointableTupleReference.create(frameEndValidationEvalFactories.length, VoidPointable.FACTORY);
}
if (frameExcludeExists) {
frameExcludeEvals = createEvaluators(frameExcludeEvalFactories, ctx);
frameExcludeComparators = createBinaryComparators(frameExcludeComparatorFactories);
- frameExcludePointables = createPointables(frameExcludeEvalFactories.length);
+ frameExcludePointables =
+ PointableTupleReference.create(frameExcludeEvalFactories.length, VoidPointable.FACTORY);
frameExcludePointable2 = VoidPointable.FACTORY.createPointable();
}
if (frameOffsetExists) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
index 7ce9668..5ed6861 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
@@ -117,12 +118,13 @@
super.init();
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
- frameValuePointables = createPointables(frameValueEvalFactories.length);
+ frameValuePointables = PointableTupleReference.create(frameValueEvalFactories.length, VoidPointable.FACTORY);
frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
- frameEndPointables = createPointables(frameEndEvalFactories.length);
+ frameEndPointables = PointableTupleReference.create(frameEndEvalFactories.length, VoidPointable.FACTORY);
if (frameEndValidationExists) {
frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
- frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+ frameEndValidationPointables =
+ PointableTupleReference.create(frameEndValidationEvalFactories.length, VoidPointable.FACTORY);
booleanAccessor = booleanAccessorFactory.createBinaryBooleanInspector(ctx);
nestedAggForInvalidFrame = nestedAggCreate();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
index 1d947c1..09b1ecf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
@@ -19,7 +19,10 @@
package org.apache.hyracks.dataflow.common.data.accessors;
+import java.util.function.Supplier;
+
import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
/**
* A tuple reference implementation that holds fields in a {@link IPointable} array
@@ -55,4 +58,22 @@
public IPointable getField(int fIdx) {
return fields[fIdx];
}
+
+ public void set(ITupleReference tupleRef) {
+ for (int i = 0; i < fields.length; i++) {
+ fields[i].set(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
+ }
+ }
+
+ public static PointableTupleReference create(int fieldCount, Supplier<IPointable> fieldFactory) {
+ IPointable[] fields = new IPointable[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+ fields[i] = fieldFactory.get();
+ }
+ return new PointableTupleReference(fields);
+ }
+
+ public static PointableTupleReference create(int fieldCount, IPointableFactory fieldFactory) {
+ return create(fieldCount, fieldFactory::createPointable);
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c018e9d..cb27011 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.std.group.AggregateState;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -188,6 +189,17 @@
return true;
}
+ public static boolean sameGroup(ITupleReference a1, ITupleReference a2, IBinaryComparator[] comparators)
+ throws HyracksDataException {
+ for (int i = 0; i < comparators.length; ++i) {
+ if (comparators[i].compare(a1.getFieldData(i), a1.getFieldStart(i), a1.getFieldLength(i),
+ a2.getFieldData(i), a2.getFieldStart(i), a2.getFieldLength(i)) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public void fail() throws HyracksDataException {
isFailed = true;