[ASTERIXDB-1443][FEED] Remove Frame Distributor
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- FrameDistributor and DistributeFeedFrameWriter are not used
anymore.
Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1853
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
deleted file mode 100644
index ae2e0b9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Provides mechanism for distributing the frames, as received from an operator to a
- * set of registered readers. Each reader typically operates at a different pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact the progress of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
- /** A unique identifier for the feed to which the incoming tuples belong. **/
- private final EntityId feedId;
-
- /**
- * An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each
- * operating in isolation.
- **/
- private final FrameDistributor frameDistributor;
-
- /** The original frame writer instantiated as part of job creation **/
- private final IFrameWriter writer;
-
- /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
- private final FeedRuntimeType feedRuntimeType;
-
- /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
- private final int partition;
-
- public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
- int partition) throws IOException {
- this.feedId = feedId;
- this.frameDistributor = new FrameDistributor();
- this.feedRuntimeType = feedRuntimeType;
- this.partition = partition;
- this.writer = writer;
- }
-
- /**
- * @param fpa
- * Feed policy accessor
- * @param nextOnlyWriter
- * the writer which will deliver the buffers
- * @param connectionId
- * (Dataverse - Dataset - Feed)
- * @return A frame collector.
- * @throws HyracksDataException
- */
- public void subscribe(FeedFrameCollector collector) throws HyracksDataException {
- frameDistributor.registerFrameCollector(collector);
- }
-
- public void unsubscribeFeed(FeedConnectionId connectionId) throws HyracksDataException {
- frameDistributor.deregisterFrameCollector(connectionId);
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- frameDistributor.close();
- } finally {
- writer.close();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void nextFrame(ByteBuffer frame) throws HyracksDataException {
- frameDistributor.nextFrame(frame);
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- @Override
- public String toString() {
- return feedId.toString() + feedRuntimeType + "[" + partition + "]";
- }
-
- @Override
- public void flush() throws HyracksDataException {
- frameDistributor.flush();
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
deleted file mode 100644
index 6ca4b77..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class FrameDistributor implements IFrameWriter {
-
- public static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
- /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
- private final Map<FeedConnectionId, FeedFrameCollector> registeredCollectors;
- private Throwable rootFailureCause = null;
-
- public FrameDistributor() throws HyracksDataException {
- this.registeredCollectors = new HashMap<FeedConnectionId, FeedFrameCollector>();
- }
-
- public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException {
- if (rootFailureCause != null) {
- throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
- rootFailureCause);
- }
- // registering a new collector.
- try {
- frameCollector.open();
- } catch (Throwable th) {
- rootFailureCause = th;
- try {
- frameCollector.fail();
- } catch (Throwable failThrowable) {
- th.addSuppressed(failThrowable);
- } finally {
- try {
- frameCollector.close();
- } catch (Throwable closeThrowable) {
- th.addSuppressed(closeThrowable);
- }
- }
- throw th;
- }
- registeredCollectors.put(frameCollector.getConnectionId(), frameCollector);
- }
-
- public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException {
- deregisterFrameCollector(frameCollector.getConnectionId());
- }
-
- public synchronized void deregisterFrameCollector(FeedConnectionId connectionId) throws HyracksDataException {
- if (rootFailureCause != null) {
- throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
- rootFailureCause);
- }
- FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
- try {
- frameCollector.close();
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
- }
- }
-
- public synchronized FeedFrameCollector removeFrameCollector(FeedConnectionId connectionId) {
- return registeredCollectors.remove(connectionId);
- }
-
- /*
- * Fix. What should be done?:
- * 0. mark failure so no one can subscribe or unsubscribe.
- * 1. Throw the throwable.
- * 2. when fail() is called, call fail on all subscribers
- * 3. close all the subscribers.
- * (non-Javadoc)
- * @see org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
- */
- @Override
- public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
- if (rootFailureCause != null) {
- throw new HyracksDataException(rootFailureCause);
- }
- for (FeedFrameCollector collector : registeredCollectors.values()) {
- try {
- collector.nextFrame(frame);
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- Collection<FeedFrameCollector> collectors = registeredCollectors.values();
- Iterator<FeedFrameCollector> it = collectors.iterator();
- while (it.hasNext()) {
- FeedFrameCollector collector = it.next();
- try {
- collector.fail();
- } catch (Throwable th) {
- while (it.hasNext()) {
- FeedFrameCollector innerCollector = it.next();
- try {
- innerCollector.fail();
- } catch (Throwable innerTh) {
- th.addSuppressed(innerTh);
- }
- }
- throw th;
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- Collection<FeedFrameCollector> collectors = registeredCollectors.values();
- Iterator<FeedFrameCollector> it = collectors.iterator();
- while (it.hasNext()) {
- FeedFrameCollector collector = it.next();
- try {
- collector.close();
- } catch (Throwable th) {
- while (it.hasNext()) {
- FeedFrameCollector innerCollector = it.next();
- try {
- innerCollector.close();
- } catch (Throwable innerTh) {
- th.addSuppressed(innerTh);
- } finally {
- innerCollector.setState(State.FINISHED);
- }
- }
- // resume here
- throw th;
- } finally {
- collector.setState(State.FINISHED);
- }
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- if (rootFailureCause != null) {
- throw new HyracksDataException(rootFailureCause);
- }
- for (FeedFrameCollector collector : registeredCollectors.values()) {
- try {
- collector.flush();
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
- }
- }
- }
-
- @Override
- public void open() throws HyracksDataException {
- // Nothing to do here :)
- }
-}