[NO ISSUE][OTH] Appender flush call with tracing call normal flush
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- The flush with tracing now calls the normal flush. This enables
sub appenders overriding the flush call to maintain correctness.
Change-Id: I3f649798fa4cac049f66cc3621acdb28b1c94694
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2080
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index b1c7ff3..56a45c6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -49,6 +49,7 @@
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.trace.ITracer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -81,8 +82,9 @@
BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
RecordDescriptor rDesc = new RecordDescriptor(serdes);
TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
- IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
- CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ PartitionWithMessageDataWriter partitioner =
+ (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+ CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
List<TestFrameWriter> recipients = new ArrayList<>();
try {
partitioner.open();
@@ -90,7 +92,7 @@
for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
recipients.add((TestFrameWriter) writer);
}
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 1);
fta.reset(writer.getLastFrame());
@@ -102,7 +104,7 @@
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
message.getBuffer().flip();
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 2);
fta.reset(writer.getLastFrame());
@@ -115,7 +117,7 @@
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 3);
fta.reset(writer.getLastFrame());
@@ -159,15 +161,16 @@
BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
RecordDescriptor rDesc = new RecordDescriptor(serdes);
TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
- IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
- NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ PartitionWithMessageDataWriter partitioner =
+ (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+ CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
partitioner.open();
FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
List<TestFrameWriter> recipients = new ArrayList<>();
for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
recipients.add((TestFrameWriter) writer);
}
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 1);
fta.reset(writer.getLastFrame());
@@ -179,7 +182,7 @@
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
message.getBuffer().flip();
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 2);
fta.reset(writer.getLastFrame());
@@ -191,7 +194,7 @@
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 3);
fta.reset(writer.getLastFrame());
@@ -262,7 +265,7 @@
tuple = ttg.next();
}
partitioner.nextFrame(frame.getBuffer());
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
Assert.assertEquals(1, partitionWriterFactory.getWriters().get(0).nextFrameCount());
Assert.assertEquals(2, partitionWriterFactory.getWriters().get(1).nextFrameCount());
Assert.assertEquals(1, partitionWriterFactory.getWriters().get(2).nextFrameCount());
@@ -321,7 +324,7 @@
appender.append(tuple);
}
partitioner.nextFrame(frame.getBuffer());
- partitioner.flush();
+ partitioner.flush(ITracer.NONE, null, null, null);;
Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1);
Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 13632f0..a377d75 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -120,10 +120,10 @@
public void flush(IFrameWriter writer, ITracer tracer, String name, String cat, String args)
throws HyracksDataException {
final long tid = ITracer.check(tracer).durationB(name, cat, args);
- if (tupleCount > 0) {
- write(writer, true);
+ try {
+ flush(writer);
+ } finally {
+ ITracer.check(tracer).durationE(tid, args);
}
- writer.flush();
- ITracer.check(tracer).durationE(tid, args);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 189ce9d..4705001 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.util.trace.ITracer;
public class PartitionDataWriter implements IFrameWriter {
private final int consumerPartitionCount;
@@ -163,6 +164,14 @@
}
}
+ public void flush(ITracer tracer, String name, String cat, String args) throws HyracksDataException {
+ for (int i = 0; i < consumerPartitionCount; i++) {
+ if (allocatedFrames[i]) {
+ appenders[i].flush(pWriters[i], tracer, name, cat, args);
+ }
+ }
+ }
+
// Wraps the current encountered exception into the final exception.
private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) {
if (finalException == null) {