ASTERIXDB-1176: remove unused feeds code
Change-Id: I2de2d7c7fd816ddbd53a80c855f64923c02efe35
Reviewed-on: https://asterix-gerrit.ics.uci.edu/494
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
deleted file mode 100644
index 46dd029..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
- private final LinkedBlockingQueue<String> inbox;
- private final FeedMessageHandler mesgHandler;
- private final String nodeId;
- private ExecutorService executor;
-
- public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
- this.inbox = new LinkedBlockingQueue<String>();
- this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
- this.nodeId = nodeId;
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- public void start() throws Exception {
- executor.execute(mesgHandler);
- }
-
- public void stop() {
- synchronized (mesgHandler.getLock()) {
- executor.shutdownNow();
- }
- mesgHandler.stop();
- }
-
- @Override
- public void sendMessage(IFeedMessage message) {
- try {
- JSONObject obj = message.toJSON();
- obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
- inbox.add(obj.toString());
- } catch (JSONException jse) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("JSON Exception in parsing message " + message);
- }
- }
- }
-
- private static class FeedMessageHandler implements Runnable {
-
- private final LinkedBlockingQueue<String> inbox;
- private final String host;
- private final int port;
-
- private Socket cfmSocket;
- private Object lock;
-
- private static final byte[] EOL = "\n".getBytes();
-
- public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
- this.inbox = inbox;
- this.host = host;
- this.port = port;
- this.lock = new Object();
- }
-
- public void run() {
- try {
- cfmSocket = new Socket(host, port);
- if (cfmSocket != null) {
- while (true) {
- String message = inbox.take();
- synchronized (lock) {
- cfmSocket.getOutputStream().write(message.getBytes());
- cfmSocket.getOutputStream().write(EOL);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start feed message service");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
- }
- } finally {
- stop();
- }
-
- }
-
- public void stop() {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopping feed message handler");
- }
- if (cfmSocket != null) {
- try {
- cfmSocket.close();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing socket " + e.getMessage());
- }
- }
- }
- }
-
- public Object getLock() {
- return lock;
- }
-
- }
-
-}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
deleted file mode 100644
index 4c48a5e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private int intakePartition;
- private int base;
- private byte[] commitAcks;
-
- public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
- super(MessageType.COMMIT_ACK);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.BASE, base);
- String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
- obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
- return obj;
- }
-
- public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int base = obj.getInt(FeedConstants.MessageConstants.BASE);
- String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
- byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
- return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
- public byte[] getCommitAcks() {
- return commitAcks;
- }
-
- public void reset(int intakePartition, int base, byte[] commitAcks) {
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- public int getBase() {
- return base;
- }
-
- public void setBase(int base) {
- this.base = base;
- }
-
-}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index f68f5ea..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final int intakePartition;
- private final int maxWindowAcked;
-
- public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
- super(MessageType.COMMIT_ACK_RESPONSE);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.maxWindowAcked = maxWindowAcked;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
- return obj;
- }
-
- public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
- return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getMaxWindowAcked() {
- return maxWindowAcked;
- }
-
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
deleted file mode 100644
index 3f59859..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator invovled in the feed pipeline.
- */
-public class EndFeedMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId feedId;
-
- public EndFeedMessage(FeedConnectionId feedId) {
- super(MessageType.END, feedId);
- this.feedId = feedId;
- }
-
- @Override
- public String toString() {
- return MessageType.END.name() + feedId;
- }
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
deleted file mode 100644
index 3dd7ca0..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A feed control message containing the altered values for
- * adapter configuration parameters. This message is dispatched
- * to all runtime instances of the feed's adapter.
- */
-public class FeedManagerElectMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final String host;
- private final String nodeId;
- private final int port;
-
- public FeedManagerElectMessage(String host, String nodeId, int port, FeedConnectionId feedId) {
- super(MessageType.SUPER_FEED_MANAGER_ELECT, feedId);
- this.host = host;
- this.port = port;
- this.nodeId = nodeId;
- }
-
- @Override
- public MessageType getMessageType() {
- return MessageType.SUPER_FEED_MANAGER_ELECT;
- }
-
- @Override
- public String toString() {
- return MessageType.SUPER_FEED_MANAGER_ELECT.name() + " " + host + "_" + nodeId + "[" + port + "]";
- }
-
- public String getHost() {
- return host;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public int getPort() {
- return port;
- }
-
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
deleted file mode 100644
index 37d929b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A control message that can be sent to the runtime instance of a
- * feed's adapter.
- */
-public class FeedMessage implements IFeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- protected final MessageType messageType;
- protected final FeedConnectionId feedId;
-
- public FeedMessage(MessageType messageType, FeedConnectionId feedId) {
- this.messageType = messageType;
- this.feedId = feedId;
- }
-
- public MessageType getMessageType() {
- return messageType;
- }
-
- public FeedConnectionId getFeedId() {
- return feedId;
- }
-
-}