Improve Messaging Connector and Add Unit Tests
Before this change, messaging connector always reserves 100 bytes
for messages which are mostly un-used. With this change, it only
reserves two bytes and sends null messages by default. In case a
new message doesn't fit in the leftover space of a frame, it sends
the frame with a null message, followed by a dedicated frame for
the message.
Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/880
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 99f1c2f..76d05d5 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -286,5 +286,10 @@
<artifactId>asterix-external-data</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
new file mode 100644
index 0000000..e267cc7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class TestTupleGenerator {
+ private final int stringFieldSizes;
+ private final FieldType[] types;
+ private final boolean reuseObject;
+ private final Random random = new Random();
+ private ITupleReference tuple;
+ private UTF8StringSerializerDeserializer stringSerde =
+ new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+ private GrowableArray[] fields;
+
+ public enum FieldType {
+ Integer64,
+ Boolean,
+ Double,
+ String
+ }
+
+ public TestTupleGenerator(FieldType[] types, int stringFieldSizes, boolean resueObject) {
+ this.types = types;
+ this.stringFieldSizes = stringFieldSizes;
+ this.reuseObject = resueObject;
+ this.fields = new GrowableArray[types.length];
+ for (int i = 0; i < types.length; i++) {
+ fields[i] = new GrowableArray();
+ }
+ tuple = new TestTupleReference(fields);
+ }
+
+ public ITupleReference next() throws HyracksDataException {
+ if (reuseObject) {
+ for (int i = 0; i < types.length; i++) {
+ fields[i].reset();
+ }
+ } else {
+ this.fields = new GrowableArray[types.length];
+ for (int i = 0; i < types.length; i++) {
+ fields[i] = new GrowableArray();
+ }
+ tuple = new TestTupleReference(fields);
+ }
+ for (int i = 0; i < types.length; i++) {
+ FieldType type = types[i];
+ switch (type) {
+ case Boolean:
+ Boolean aBoolean = random.nextBoolean();
+ BooleanSerializerDeserializer.INSTANCE.serialize(aBoolean, fields[i].getDataOutput());
+ break;
+ case Double:
+ double aDouble = random.nextDouble();
+ DoubleSerializerDeserializer.INSTANCE.serialize(aDouble, fields[i].getDataOutput());
+ break;
+ case Integer64:
+ long aLong = random.nextLong();
+ Integer64SerializerDeserializer.INSTANCE.serialize(aLong, fields[i].getDataOutput());
+ break;
+ case String:
+ String aString = RandomStringUtils.randomAlphanumeric(stringFieldSizes);
+ stringSerde.serialize(aString, fields[i].getDataOutput());
+ break;
+ default:
+ break;
+ }
+ }
+ return tuple;
+ }
+
+ private class TestTupleReference implements ITupleReference {
+ private final GrowableArray[] fields;
+
+ private TestTupleReference(GrowableArray[] fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fields.length;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+
+ return fields[fIdx].getByteArray();
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return 0;
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return fields[fIdx].getLength();
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
new file mode 100644
index 0000000..2f712cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.test.common.TestTupleGenerator;
+import org.apache.asterix.test.common.TestTupleGenerator.FieldType;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConnectorDescriptorWithMessagingTest {
+
+ // Tuples used in this test are made of Fields of {Integer,Double,Boolean,UTF8String};
+ private static final int NUMBER_OF_CONSUMERS = 5;
+ private static final int DEFAULT_FRAME_SIZE = 32768;
+ private static final int CURRENT_PRODUCER = 0;
+ private static final int STRING_FIELD_SIZES = 32;
+
+ @Test
+ public void testEmptyFrames() throws Exception {
+ try {
+ List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+ IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+ ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+ MToNPartitioningWithMessageConnectorDescriptor connector =
+ new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ VSizeFrame message = new VSizeFrame(ctx);
+ VSizeFrame tempBuffer = new VSizeFrame(ctx);
+ ctx.setSharedObject(message);
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+ Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+ BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ RecordDescriptor rDesc = new RecordDescriptor(serdes);
+ TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+ IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+ NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ partitioner.open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+ List<TestFrameWriter> recipients = new ArrayList<>();
+ for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+ recipients.add((TestFrameWriter) writer);
+ }
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 1);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 2);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 3);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ partitioner.close();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 4);
+ Assert.assertEquals(writer.closeCount(), 1);
+ }
+
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @Test
+ public void testMessageLargerThanEmptyFrame() throws Exception {
+ try {
+ List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+ IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+ ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+ MToNPartitioningWithMessageConnectorDescriptor connector =
+ new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ VSizeFrame message = new VSizeFrame(ctx);
+ VSizeFrame tempBuffer = new VSizeFrame(ctx);
+ ctx.setSharedObject(message);
+ writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+ ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+ Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+ BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ RecordDescriptor rDesc = new RecordDescriptor(serdes);
+ TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+ IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+ NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ partitioner.open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+ List<TestFrameWriter> recipients = new ArrayList<>();
+ for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+ recipients.add((TestFrameWriter) writer);
+ }
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 1);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 2);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ partitioner.flush();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 3);
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ partitioner.close();
+ for (TestFrameWriter writer : recipients) {
+ Assert.assertEquals(writer.nextFrameCount(), 4);
+ Assert.assertEquals(writer.closeCount(), 1);
+ }
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ private void writeRandomMessage(VSizeFrame frame, byte tag, int size) throws HyracksDataException {
+ // We subtract 2, 1 for the tag, and one for the end offset
+ Random random = new Random();
+ byte[] bytes = new byte[size - 2];
+ random.nextBytes(bytes);
+ int frameSize = FrameHelper.calcAlignedFrameSizeToStore(1, size - 1, DEFAULT_FRAME_SIZE);
+ frame.ensureFrameSize(frameSize);
+ frame.getBuffer().clear();
+ frame.getBuffer().put(tag);
+ frame.getBuffer().put(bytes);
+ frame.getBuffer().flip();
+ }
+
+ @Test
+ public void testMessageLargerThanSome() throws Exception {
+ try {
+ // Routing will be to 1, 3, and 4 only. 0 and 2 will receive no tuples
+ List<Integer> routing = Arrays.asList(1, 3, 4);
+ IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+ ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+ MToNPartitioningWithMessageConnectorDescriptor connector =
+ new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ VSizeFrame message = new VSizeFrame(ctx);
+ VSizeFrame tempBuffer = new VSizeFrame(ctx);
+ ctx.setSharedObject(message);
+ message.getBuffer().clear();
+ writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+ ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+ Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+ BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String };
+ RecordDescriptor rDesc = new RecordDescriptor(serdes);
+ TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+ PartitionWithMessageDataWriter partitioner =
+ (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+ CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ partitioner.open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+ List<TestFrameWriter> recipients = new ArrayList<>();
+ for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) {
+ recipients.add(partitionWriterFactory.getWriters().get(i));
+ }
+ TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender appender = new FrameTupleAppender(frame);
+ ITupleReference tuple = ttg.next();
+ while (appender.append(tuple)) {
+ tuple = ttg.next();
+ }
+ partitioner.nextFrame(frame.getBuffer());
+ partitioner.flush();
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 2);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 2);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 2);
+ for (TestFrameWriter writer : recipients) {
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 1);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ partitioner.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @Test
+ public void testMessageFitsWithTuples() throws Exception {
+ try {
+ // Routing will be round robin
+ List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+ IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+ ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+ MToNPartitioningWithMessageConnectorDescriptor connector =
+ new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ VSizeFrame message = new VSizeFrame(ctx);
+ VSizeFrame tempBuffer = new VSizeFrame(ctx);
+ ctx.setSharedObject(message);
+ message.getBuffer().clear();
+ message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+ message.getBuffer().flip();
+ ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+ Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+ BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String };
+ RecordDescriptor rDesc = new RecordDescriptor(serdes);
+ TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+ PartitionWithMessageDataWriter partitioner =
+ (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+ CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ partitioner.open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+ List<TestFrameWriter> recipients = new ArrayList<>();
+ for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) {
+ recipients.add(partitionWriterFactory.getWriters().get(i));
+ }
+ TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender appender = new FrameTupleAppender(frame);
+ for (int count = 0; count < NUMBER_OF_CONSUMERS; count++) {
+ ITupleReference tuple = ttg.next();
+ appender.append(tuple);
+ }
+ partitioner.nextFrame(frame.getBuffer());
+ partitioner.flush();
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 1);
+ Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 1);
+ for (TestFrameWriter writer : recipients) {
+ fta.reset(writer.getLastFrame());
+ Assert.assertEquals(fta.getTupleCount(), 2);
+ FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+ Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+ MessagingFrameTupleAppender.getMessageType(tempBuffer));
+ }
+ partitioner.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
new file mode 100644
index 0000000..385f6a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final List<Integer> destinations;
+
+ /*
+ * For test purposes, this partition computer produces partitions according to a passed in
+ * list of integers.
+ */
+ public TestPartitionComputerFactory(List<Integer> destinations) {
+ this.destinations = destinations;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ private final List<Integer> destinations =
+ new ArrayList<Integer>(TestPartitionComputerFactory.this.destinations);
+ private Iterator<Integer> iterator = destinations.iterator();
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ if (destinations.size() == 0) {
+ return 0;
+ }
+ while (!iterator.hasNext()) {
+ iterator = destinations.iterator();
+ }
+ return iterator.next();
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
new file mode 100644
index 0000000..4b4c722
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestFrameWriter;
+
+/*
+ * A partition writer factory that is used for testing partitioners
+ */
+public class TestPartitionWriterFactory implements IPartitionWriterFactory {
+ private HashMap<Integer, TestFrameWriter> writers = new HashMap<>();
+
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ // The created writers must retain a deep copy of the input frame
+ writers.put(receiverIndex, FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), true));
+ return writers.get(receiverIndex);
+ }
+
+ public HashMap<Integer, TestFrameWriter> getWriters() {
+ return writers;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
similarity index 70%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
index 68783ca..536bf3a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.util;
+package org.apache.asterix.test.dataflow;
-public class FeedMessageUtils {
- public enum MessageType {
- NULL,
- ACK_REQUEST
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class TestRecordDescriptorFactory {
+ public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
+ return null;
}
-
- public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal();
- public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index b09bef9..87daffa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -67,18 +67,7 @@
continue;
}
tb.reset();
- try {
- dataParser.parse(record, tb.getDataOutput());
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
- feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
- continue;
- }
- tb.addFieldEndOffset();
- addMetaPart(tb, record);
- addPrimaryKeys(tb, record);
- tupleForwarder.addTuple(tb);
+ parseAndForward(record);
}
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside termination of a job/feed
@@ -107,6 +96,23 @@
}
}
+ private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
+ synchronized (dataParser) {
+ try {
+ dataParser.parse(record, tb.getDataOutput());
+ } catch (Exception e) {
+ LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+ feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
+ // continue the outer loop
+ return;
+ }
+ tb.addFieldEndOffset();
+ addMetaPart(tb, record);
+ addPrimaryKeys(tb, record);
+ tupleForwarder.addTuple(tb);
+ }
+ }
+
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index ed5e355..0d72682 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,8 @@
private final AsterixInputStream stream;
public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- FeedLogManager feedLogManager, int numOfFields, IStreamDataParser streamParser,
- AsterixInputStream inputStream) {
- super(ctx, tupleForwarder, feedLogManager, numOfFields);
+ FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) {
+ super(ctx, tupleForwarder, feedLogManager, 1);
this.dataParser = streamParser;
this.stream = inputStream;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 7ae2f41..f1eb870 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -19,14 +19,12 @@
package org.apache.asterix.external.dataflow;
import java.io.IOException;
-import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedMessageUtils;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,6 +33,7 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
public class FeedTupleForwarder implements ITupleForwarder {
@@ -60,10 +59,10 @@
this.writer = writer;
this.appender = new FrameTupleAppender(frame);
// Set null feed message
- ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+ VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
// a null message
- message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
- message.flip();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
initialized = true;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 5e8c022..6cdc2af 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,16 +18,15 @@
*/
package org.apache.asterix.external.feed.runtime;
-import java.nio.ByteBuffer;
import java.util.logging.Level;
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
public class IngestionRuntime extends SubscribableRuntime {
@@ -48,7 +47,7 @@
dWriter.subscribe(collector);
subscribers.add(collectionRuntime);
if (numSubscribers == 0) {
- ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
+ ctx.setSharedObject(new VSizeFrame(ctx));
collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
adapterRuntimeManager.start();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 716468e..8d8bc28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -34,13 +34,13 @@
import org.apache.asterix.external.feed.runtime.FeedRuntime;
import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
/*
@@ -86,7 +86,7 @@
private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
- private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+ private final VSizeFrame message;
private final FeedMetaOperatorDescriptor opDesc;
@@ -111,6 +111,7 @@
this.connectionId = feedConnectionId;
this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
+ this.message = new VSizeFrame(ctx);
ctx.setSharedObject(message);
this.opDesc = feedMetaOperatorDescriptor;
this.recordDescProvider = recordDescProvider;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index b79707b..47df39e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -35,13 +35,13 @@
import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -87,7 +87,7 @@
private final String targetId;
- private final ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+ private final VSizeFrame message;
private final IRecordDescriptorProvider recordDescProvider;
@@ -106,6 +106,7 @@
this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
this.targetId = targetId;
+ this.message = new VSizeFrame(ctx);
ctx.setSharedObject(message);
this.recordDescProvider = recordDescProvider;
this.opDesc = feedMetaOperatorDescriptor;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 6ba27d8..50ebb71 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -48,7 +48,6 @@
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -73,8 +72,8 @@
DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
recordReader, ((IIndexingDatasource) recordReader).getIndexer());
} else if (isFeed) {
- FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils
- .getTupleForwarder(configuration, feedLogManager);
+ FeedTupleForwarder tupleForwarder =
+ (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager);
boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
if (isRecordWithMeta) {
@@ -108,7 +107,7 @@
if (isFeed) {
return new FeedStreamDataFlowController(ctx,
(FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
- feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
+ feedLogManager, streamParser, stream);
} else {
return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
streamParser);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 51e7e72..8228c39 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -32,13 +32,14 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.util.IntSerDeUtils;
import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.util.IntSerDeUtils;
public class FeedUtils {
private static String prepareDataverseFeedName(String dataverseName, String feedName) {
@@ -49,8 +50,8 @@
ClusterPartition partition) {
File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
- String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- partition.getPartitionId());
+ String storagePartitionPath =
+ StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition.getPartitionId());
// Note: feed adapter instances in a single node share the feed logger
// format: 'storage dir name'/partition_#/dataverse/feed/node
File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
@@ -88,22 +89,21 @@
feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile());
}
- public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) {
+ public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)
+ throws HyracksDataException {
// read the message and reduce the number of tuples
fta.reset(input);
int tc = fta.getTupleCount() - 1;
int offset = fta.getTupleStartOffset(tc);
int len = fta.getTupleLength(tc);
- message.clear();
- message.put(input.array(), offset, len);
- message.flip();
+ int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+ message.ensureFrameSize(newSize);
+ message.getBuffer().clear();
+ message.getBuffer().put(input.array(), offset, len);
+ message.getBuffer().flip();
IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc);
}
- public static int getNumOfFields(Map<String, String> configuration) {
- return 1;
- }
-
public static String getFeedMetaTypeName(Map<String, String> configuration) {
return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index c00db7a..3becd96 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -118,7 +118,8 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -159,7 +160,8 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -207,7 +209,7 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
@@ -317,7 +319,7 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
@@ -390,7 +392,7 @@
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
@@ -452,7 +454,7 @@
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
@@ -509,7 +511,7 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -553,7 +555,8 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -593,7 +596,8 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -632,7 +636,7 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -682,7 +686,7 @@
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -732,7 +736,7 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index 4bddfa9..c9cc71e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -33,7 +33,7 @@
}
public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
- Collection<FrameWriterOperation> errorThrowingOperations) {
+ Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
CountAnswer openAnswer =
createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations);
CountAnswer nextAnswer =
@@ -44,7 +44,7 @@
createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations);
CountAnswer closeAnswer =
createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations);
- return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer);
+ return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames);
}
public static CountAnswer createAnswer(FrameWriterOperation operation,
@@ -59,7 +59,7 @@
}
}
- public static TestControlledFrameWriter create(int initialFrameSize) {
- return new TestControlledFrameWriter(initialFrameSize);
+ public static TestControlledFrameWriter create(int initialFrameSize, boolean deepCopyInputFrames) {
+ return new TestControlledFrameWriter(initialFrameSize, deepCopyInputFrames);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
index bf168c2..b98bcf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -30,8 +30,9 @@
private volatile int currentMultiplier = 0;
private volatile int kicks = 0;
- public TestControlledFrameWriter(int initialFrameSize) {
- super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer());
+ public TestControlledFrameWriter(int initialFrameSize, boolean deepCopyInputFrames) {
+ super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(),
+ deepCopyInputFrames);
this.initialFrameSize = initialFrameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
index b3492fe..065e64d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -29,19 +29,27 @@
private final CountAnswer flushAnswer;
private final CountAnswer failAnswer;
private final CountAnswer closeAnswer;
+ private static final int BYTES32KB = 32768;
private long openDuration = 0L;
private long nextDuration = 0L;
private long flushDuration = 0L;
private long failDuration = 0L;
private long closeDuration = 0L;
+ // If copyFrames was set, we take a copy of the frame, otherwise, we simply point lastFrame to it
+ private final boolean deepCopyFrames;
+ private ByteBuffer lastFrame;
public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer,
- CountAnswer failAnswer, CountAnswer closeAnswer) {
+ CountAnswer failAnswer, CountAnswer closeAnswer, boolean deepCopyFrames) {
this.openAnswer = openAnswer;
this.nextAnswer = nextAnswer;
this.closeAnswer = closeAnswer;
this.flushAnswer = flushAnswer;
this.failAnswer = failAnswer;
+ this.deepCopyFrames = deepCopyFrames;
+ if (deepCopyFrames) {
+ lastFrame = ByteBuffer.allocate(BYTES32KB);
+ }
}
@Override
@@ -56,6 +64,15 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (deepCopyFrames) {
+ if (lastFrame.capacity() != buffer.capacity()) {
+ lastFrame = ByteBuffer.allocate(buffer.capacity());
+ }
+ lastFrame.clear();
+ lastFrame.put(buffer.array());
+ } else {
+ lastFrame = buffer;
+ }
delay(nextDuration);
nextAnswer.call();
}
@@ -170,4 +187,8 @@
}
}
}
+
+ public ByteBuffer getLastFrame() {
+ return lastFrame;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 1623035..ef11b5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.util.IntSerDeUtils;
public class FrameTupleAppender extends AbstractFrameAppender implements IFrameTupleAppender {
@@ -61,6 +62,30 @@
return false;
}
+ public boolean append(ITupleReference tuple) throws HyracksDataException {
+ int tupleSize = 0;
+ for (int i = 0; i < tuple.getFieldCount(); i++) {
+ tupleSize += tuple.getFieldLength(i);
+ }
+ if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
+ int offset = 0;
+ for (int i = 0; i < tuple.getFieldCount(); ++i) {
+ IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
+ System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
+ tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
+ offset += tuple.getFieldLength(i);
+ }
+ tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+ IntSerDeUtils.putInt(getBuffer().array(),
+ FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+ tupleCount);
+ return true;
+ }
+ return false;
+ }
+
@Override
public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
if (canHoldNewTuple(0, length)) {
@@ -252,8 +277,8 @@
int fEndOffset = 0;
for (int i = 0; i < fields.length; ++i) {
int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
- int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
- - accessor.getFieldStartOffset(tIndex, fields[i]);
+ int fLen =
+ accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]);
System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
fEndOffset += fLen;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
index 77495dd..e57e12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -71,19 +71,6 @@
FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender, fieldSlots, bytes, offset, length);
}
- public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
- FrameUtils.appendToWriter(outputWriter, frameTupleAppender, bytes, offset, length);
- }
-
- public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
- throws HyracksDataException {
- FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tStartOffset, tEndOffset);
- }
-
- public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
- FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tIndex);
- }
-
public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
throws HyracksDataException {
FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0, accessor1, tIndex1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 345c506..cae659d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -18,32 +18,81 @@
*/
package org.apache.hyracks.dataflow.common.io;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.util.IntSerDeUtils;
+/**
+ * A frame tuple appender that appends messages stored in the task context when pushing frames forward
+ * This appender must only be used on network boundary
+ */
public class MessagingFrameTupleAppender extends FrameTupleAppender {
- public static final int MAX_MESSAGE_SIZE = 100;
private final IHyracksTaskContext ctx;
+ private static final int NULL_MESSAGE_SIZE = 1;
+ public static final byte NULL_FEED_MESSAGE = 0x01;
+ public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
+ public static final byte SNAPSHOT_MESSAGE = 0x03;
public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
this.ctx = ctx;
}
+ public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException {
+ out.println(getMessageString(message));
+ }
+
+ public static String getMessageString(VSizeFrame message) throws HyracksDataException {
+ StringBuilder aString = new StringBuilder();
+ aString.append("Message Type: ");
+ switch (getMessageType(message)) {
+ case NULL_FEED_MESSAGE:
+ aString.append("Null, ");
+ break;
+ case ACK_REQ_FEED_MESSAGE:
+ aString.append("Ack Request, ");
+ break;
+ case SNAPSHOT_MESSAGE:
+ aString.append("Snapshot, ");
+ break;
+ default:
+ aString.append("Unknown, ");
+ break;
+ }
+ aString.append("Message Length: ");
+ int messageLength = message.getBuffer().remaining();
+ aString.append(messageLength);
+ return aString.toString();
+ }
+
+ public static byte getMessageType(VSizeFrame message) throws HyracksDataException {
+ switch (message.getBuffer().array()[0]) {
+ case NULL_FEED_MESSAGE:
+ return NULL_FEED_MESSAGE;
+ case ACK_REQ_FEED_MESSAGE:
+ return ACK_REQ_FEED_MESSAGE;
+ case SNAPSHOT_MESSAGE:
+ return SNAPSHOT_MESSAGE;
+ default:
+ throw new HyracksDataException("Unknown message type");
+ }
+ }
+
@Override
protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
- if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+ if (hasEnoughSpace(fieldCount + 1, dataLength + NULL_MESSAGE_SIZE)) {
return true;
}
if (tupleCount == 0) {
- frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
- frame.getMinSize()));
+ frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount + 1,
+ dataLength + NULL_MESSAGE_SIZE, frame.getMinSize()));
reset(frame.getBuffer(), true);
return true;
}
@@ -52,13 +101,32 @@
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
- appendMessage((ByteBuffer) ctx.getSharedObject());
+ // If message fits, we append it, otherwise, we append a null message, then send a message only
+ // frame with the message
+ ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
+ int messageSize = message.limit() - message.position();
+ if (hasEnoughSpace(1, messageSize)) {
+ appendMessage(message);
+ forward(outWriter);
+ } else {
+ if (tupleCount > 0) {
+ appendNullMessage();
+ forward(outWriter);
+ }
+ if (!hasEnoughSpace(1, messageSize)) {
+ frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+ reset(frame.getBuffer(), true);
+ }
+ appendMessage(message);
+ forward(outWriter);
+ }
+ }
+
+ private void forward(IFrameWriter outWriter) throws HyracksDataException {
getBuffer().clear();
outWriter.nextFrame(getBuffer());
- if (clearFrame) {
- frame.reset();
- reset(getBuffer(), true);
- }
+ frame.reset();
+ reset(getBuffer(), true);
}
private void appendMessage(ByteBuffer message) {
@@ -69,4 +137,13 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
+
+ private void appendNullMessage() {
+ array[tupleDataEndOffset] = NULL_FEED_MESSAGE;
+ tupleDataEndOffset++;
+ IntSerDeUtils.putInt(getBuffer().array(),
+ FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
index e90d8b0..f6996f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -30,6 +30,11 @@
private static final long serialVersionUID = 1L;
+ /**
+ * This connector enable sending messages alongside data tuples. Messages are sent on flush() calls.
+ * It broadcasts messages to all consumers. If the message doesn't fit in the current frame for a specific
+ * receiver, the current frame is sent and a subsequent one with the message only is sent
+ */
public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
ITuplePartitionComputerFactory tpcf) {
super(spec, tpcf);
@@ -38,7 +43,7 @@
@Override
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
tpcf.createPartitioner());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 98bd860..f2ec64b 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -44,6 +44,7 @@
private WorkspaceFileFactory fileFactory;
private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
+ private Object sharedObject;
public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
this.jobletContext = jobletContext;
@@ -149,10 +150,11 @@
@Override
public Object getSharedObject() {
- return null;
+ return sharedObject;
}
@Override
public void setSharedObject(Object sharedObject) {
+ this.sharedObject = sharedObject;
}
}