Remove Key-Value Adapter

Change-Id: Iaa7d8d70b0869242d1a872f55f0c6928fda94dcb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/868
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
deleted file mode 100644
index 41bcc46..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.kv;
-
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.endpoint.dcp.DCPConnection;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-import com.couchbase.client.core.message.dcp.DCPRequest;
-import com.couchbase.client.core.message.dcp.MutationMessage;
-import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
-import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
-import com.couchbase.client.core.message.dcp.RemoveMessage;
-import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
-
-import rx.Subscriber;
-
-public class KVReader implements IRecordReader<DCPRequest> {
-
-    private static final Logger LOGGER = Logger.getLogger(KVReader.class);
-    private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0,
-            0L, null);
-    private final String feedName;
-    private final short[] vbuckets;
-    private final String bucket;
-    private final String password;
-    private final String[] sourceNodes;
-    private final CouchbaseCore core;
-    private final GenericRecord<DCPRequest> record;
-    private final ArrayBlockingQueue<DCPRequest> messages;
-    private AbstractFeedDataFlowController controller;
-    private Thread pushThread;
-    private boolean done = false;
-
-    public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets,
-            int queueSize, CouchbaseCore core) throws HyracksDataException {
-        this.feedName = feedName;
-        this.bucket = bucket;
-        this.password = password;
-        this.sourceNodes = sourceNodes;
-        this.vbuckets = vbuckets;
-        this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
-        this.core = core;
-        this.record = new GenericRecord<>();
-        this.pushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KVReader.this.run();
-            }
-        }, feedName);
-        pushThread.start();
-    }
-
-    @Override
-    public void close() {
-        if (!done) {
-            done = true;
-        }
-    }
-
-    private void run() {
-        core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
-                .toBlocking().single();
-        core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
-                .toBlocking().single();
-        DCPConnection connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket))
-                .toBlocking().single().connection();
-        for (int i = 0; i < vbuckets.length; i++) {
-            connection.addStream(vbuckets[i]).toBlocking().single();
-        }
-        try {
-            connection.subject().toBlocking().subscribe(new Subscriber<DCPRequest>() {
-                @Override
-                public void onCompleted() {
-                }
-
-                @Override
-                public void onError(Throwable e) {
-                    e.printStackTrace();
-                }
-
-                @Override
-                public void onNext(DCPRequest dcpRequest) {
-                    try {
-                        if (dcpRequest instanceof SnapshotMarkerMessage) {
-                            SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
-                            LOGGER.info("snapshot DCP message received: " + message);
-                        } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) {
-                            messages.put(dcpRequest);
-                        } else {
-                            LOGGER.warn("Unknown type of DCP messages: " + dcpRequest);
-                        }
-                    } catch (Throwable th) {
-                        LOGGER.error(th);
-                    }
-                }
-            });
-        } catch (Throwable th) {
-            th.printStackTrace();
-            throw th;
-        }
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        return !done;
-    }
-
-    @Override
-    public IRawRecord<DCPRequest> next() throws IOException, InterruptedException {
-        if (messages.isEmpty()) {
-            controller.flush();
-        }
-        DCPRequest dcpRequest = messages.take();
-        if (dcpRequest == POISON_PILL) {
-            return null;
-        }
-        record.set(dcpRequest);
-        return record;
-    }
-
-    @Override
-    public boolean stop() {
-        done = true;
-        core.send(new CloseBucketRequest(bucket)).toBlocking();
-        pushThread.interrupt();
-        try {
-            messages.put(KVReader.POISON_PILL);
-        } catch (InterruptedException e) {
-            LOGGER.warn(e);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
-        this.controller = controller;
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return false;
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
deleted file mode 100644
index 70d53f6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.kv;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.config.CouchbaseBucketConfig;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-import com.couchbase.client.core.message.dcp.DCPRequest;
-
-import rx.functions.Func1;
-
-public class KVReaderFactory implements IRecordReaderFactory<DCPRequest> {
-
-    private static final long serialVersionUID = 1L;
-    // Constant fields
-    public static final boolean DCP_ENABLED = true;
-    public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
-    public static final int TIMEOUT = 5;
-    public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
-    // Dynamic fields
-    private Map<String, String> configuration;
-    private String bucket;
-    private String password = "";
-    private String[] couchbaseNodes;
-    private int numOfVBuckets;
-    private int[] schedule;
-    private String feedName;
-    // Transient fields
-    private static transient CouchbaseCore core;
-    private transient Builder builder;
-    private static transient DefaultCoreEnvironment env;
-    private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        if (locationConstraints == null) {
-            String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
-            Set<String> ncs = new HashSet<String>(Arrays.asList(allPartitions));
-            locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
-        }
-        return locationConstraints;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws AsterixException {
-        // validate first
-        if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
-            throw new AsterixException("Unspecified bucket");
-        }
-        if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
-            throw new AsterixException("Unspecified Couchbase nodes");
-        }
-        if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
-            password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
-        }
-        this.configuration = configuration;
-        ExternalDataUtils.setNumberOfKeys(configuration, 1);
-        ExternalDataUtils.setChangeFeed(configuration, ExternalDataConstants.TRUE);
-        ExternalDataUtils.setRecordWithMeta(configuration, ExternalDataConstants.TRUE);
-        bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
-        couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
-        feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
-        createEnvironment("CC");
-        getNumberOfVbuckets();
-        schedule();
-    }
-
-    private void createEnvironment(String connectionName) {
-        synchronized (TIME_UNIT) {
-            if (core == null) {
-                builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName)
-                        .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
-                env = builder.build();
-                core = new CouchbaseCore(env);
-            }
-        }
-    }
-
-    /*
-     * We distribute the work of streaming vbuckets between all the partitions in a round robin
-     * fashion.
-     */
-    private void schedule() {
-        schedule = new int[numOfVBuckets];
-        String[] locations = getPartitionConstraint().getLocations();
-        for (int i = 0; i < numOfVBuckets; i++) {
-            schedule[i] = i % locations.length;
-        }
-    }
-
-    private void getNumberOfVbuckets() {
-        core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
-                .map(new Func1<GetClusterConfigResponse, Integer>() {
-                    @Override
-                    public Integer call(GetClusterConfigResponse response) {
-                        CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
-                        return config.numberOfPartitions();
-                    }
-                }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        core.send(new CloseBucketRequest(bucket)).toBlocking();
-    }
-
-    @Override
-    public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        createEnvironment(nodeName);
-        ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
-        for (int i = 0; i < schedule.length; i++) {
-            if (schedule[i] == partition) {
-                listOfAssignedVBuckets.add((short) i);
-            }
-        }
-        short[] vbuckets = new short[listOfAssignedVBuckets.size()];
-        for (int i = 0; i < vbuckets.length; i++) {
-            vbuckets[i] = listOfAssignedVBuckets.get(i);
-        }
-        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets,
-                ExternalDataUtils.getQueueSize(configuration), core);
-    }
-
-    @Override
-    public Class<?> getRecordClass() {
-        return DCPRequest.class;
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 1dd2fe8..f71e9a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
-import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
 import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
 import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
@@ -91,8 +90,6 @@
             return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
         }
         switch (reader) {
-            case ExternalDataConstants.READER_KV:
-                return new KVReaderFactory();
             case ExternalDataConstants.READER_KV_TEST:
                 return new KVTestReaderFactory();
             case ExternalDataConstants.READER_HDFS:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index cf2b7f3..81f8377 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -120,7 +120,6 @@
      * Builtin record readers
      */
     public static final String READER_HDFS = "hdfs";
-    public static final String READER_KV = "key-value";
     public static final String READER_TWITTER_PUSH = "twitter_push";
     public static final String READER_PUSH_TWITTER = "push_twitter";
     public static final String READER_TWITTER_PULL = "twitter_pull";