Add a test case for Hyracks error reporting.
Change-Id: Ic3ac4c7b0a07eacbc448ce5724085eb8d1e6bc4d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/527
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
new file mode 100644
index 0000000..fb816da
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.rewriting;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class ErrorReportingTest extends AbstractIntegrationTest {
+
+ static final String EXPECTED_ERROR_MESSAGE = "expected failure";
+
+ @Test
+ public void testInitialize() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ TestSourceOperatorDescriptor ets1 = new TestSourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets1, NC1_ID);
+
+ TestSourceOperatorDescriptor ets2 = new TestSourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets2, NC1_ID);
+
+ TestSourceOperatorDescriptor ets3 = new TestSourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets3, NC1_ID);
+
+ ExceptionRaisingOperatorDescriptor tc = new ExceptionRaisingOperatorDescriptor(spec, 3);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, tc, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets1, 0, tc, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets2, 0, tc, 1);
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets3, 0, tc, 2);
+ spec.addRoot(tc);
+
+ try {
+ runTest(spec);
+ } catch (Exception e) {
+ Throwable t = getRootCause(e);
+ Assert.assertTrue(t.getMessage().equals(EXPECTED_ERROR_MESSAGE));
+ }
+ }
+
+ private Throwable getRootCause(Throwable t) {
+ Throwable cause = t.getCause();
+ if (cause == null) {
+ return t;
+ }
+ return getRootCause(cause);
+ }
+}
+
+class TestSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public TestSourceOperatorDescriptor(JobSpecification spec) {
+ super(spec, 0, 1);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private ByteBuffer frame = ctx.allocateFrame();
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ writer.nextFrame(frame);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+
+}
+
+class ExceptionRaisingOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public ExceptionRaisingOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity) {
+ super(spec, inputArity, 0);
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ ExceptionRaisingActivityNode tca = new ExceptionRaisingActivityNode(new ActivityId(getOperatorId(), 0));
+ builder.addActivity(this, tca);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, tca, i);
+ }
+ }
+
+ private class ExceptionRaisingActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public ExceptionRaisingActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new IOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(final int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (index == inputArity - 1) {
+ throw new HyracksDataException(ErrorReportingTest.EXPECTED_ERROR_MESSAGE);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+ };
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "Exception-Raising-Activity";
+ }
+
+ };
+ }
+ }
+
+}