Remove Key-Value Adapter
Change-Id: Iaa7d8d70b0869242d1a872f55f0c6928fda94dcb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/868
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
deleted file mode 100644
index 41bcc46..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.kv;
-
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.endpoint.dcp.DCPConnection;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-import com.couchbase.client.core.message.dcp.DCPRequest;
-import com.couchbase.client.core.message.dcp.MutationMessage;
-import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
-import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
-import com.couchbase.client.core.message.dcp.RemoveMessage;
-import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
-
-import rx.Subscriber;
-
-public class KVReader implements IRecordReader<DCPRequest> {
-
- private static final Logger LOGGER = Logger.getLogger(KVReader.class);
- private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0,
- 0L, null);
- private final String feedName;
- private final short[] vbuckets;
- private final String bucket;
- private final String password;
- private final String[] sourceNodes;
- private final CouchbaseCore core;
- private final GenericRecord<DCPRequest> record;
- private final ArrayBlockingQueue<DCPRequest> messages;
- private AbstractFeedDataFlowController controller;
- private Thread pushThread;
- private boolean done = false;
-
- public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets,
- int queueSize, CouchbaseCore core) throws HyracksDataException {
- this.feedName = feedName;
- this.bucket = bucket;
- this.password = password;
- this.sourceNodes = sourceNodes;
- this.vbuckets = vbuckets;
- this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
- this.core = core;
- this.record = new GenericRecord<>();
- this.pushThread = new Thread(new Runnable() {
- @Override
- public void run() {
- KVReader.this.run();
- }
- }, feedName);
- pushThread.start();
- }
-
- @Override
- public void close() {
- if (!done) {
- done = true;
- }
- }
-
- private void run() {
- core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
- .toBlocking().single();
- core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
- .toBlocking().single();
- DCPConnection connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket))
- .toBlocking().single().connection();
- for (int i = 0; i < vbuckets.length; i++) {
- connection.addStream(vbuckets[i]).toBlocking().single();
- }
- try {
- connection.subject().toBlocking().subscribe(new Subscriber<DCPRequest>() {
- @Override
- public void onCompleted() {
- }
-
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
-
- @Override
- public void onNext(DCPRequest dcpRequest) {
- try {
- if (dcpRequest instanceof SnapshotMarkerMessage) {
- SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
- LOGGER.info("snapshot DCP message received: " + message);
- } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) {
- messages.put(dcpRequest);
- } else {
- LOGGER.warn("Unknown type of DCP messages: " + dcpRequest);
- }
- } catch (Throwable th) {
- LOGGER.error(th);
- }
- }
- });
- } catch (Throwable th) {
- th.printStackTrace();
- throw th;
- }
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return !done;
- }
-
- @Override
- public IRawRecord<DCPRequest> next() throws IOException, InterruptedException {
- if (messages.isEmpty()) {
- controller.flush();
- }
- DCPRequest dcpRequest = messages.take();
- if (dcpRequest == POISON_PILL) {
- return null;
- }
- record.set(dcpRequest);
- return record;
- }
-
- @Override
- public boolean stop() {
- done = true;
- core.send(new CloseBucketRequest(bucket)).toBlocking();
- pushThread.interrupt();
- try {
- messages.put(KVReader.POISON_PILL);
- } catch (InterruptedException e) {
- LOGGER.warn(e);
- return false;
- }
- return true;
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- this.controller = controller;
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- }
-
- @Override
- public boolean handleException(Throwable th) {
- return false;
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
deleted file mode 100644
index 70d53f6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.kv;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.config.CouchbaseBucketConfig;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-import com.couchbase.client.core.message.dcp.DCPRequest;
-
-import rx.functions.Func1;
-
-public class KVReaderFactory implements IRecordReaderFactory<DCPRequest> {
-
- private static final long serialVersionUID = 1L;
- // Constant fields
- public static final boolean DCP_ENABLED = true;
- public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
- public static final int TIMEOUT = 5;
- public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
- // Dynamic fields
- private Map<String, String> configuration;
- private String bucket;
- private String password = "";
- private String[] couchbaseNodes;
- private int numOfVBuckets;
- private int[] schedule;
- private String feedName;
- // Transient fields
- private static transient CouchbaseCore core;
- private transient Builder builder;
- private static transient DefaultCoreEnvironment env;
- private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- if (locationConstraints == null) {
- String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
- Set<String> ncs = new HashSet<String>(Arrays.asList(allPartitions));
- locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
- }
- return locationConstraints;
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws AsterixException {
- // validate first
- if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
- throw new AsterixException("Unspecified bucket");
- }
- if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
- throw new AsterixException("Unspecified Couchbase nodes");
- }
- if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
- password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
- }
- this.configuration = configuration;
- ExternalDataUtils.setNumberOfKeys(configuration, 1);
- ExternalDataUtils.setChangeFeed(configuration, ExternalDataConstants.TRUE);
- ExternalDataUtils.setRecordWithMeta(configuration, ExternalDataConstants.TRUE);
- bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
- couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
- feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
- createEnvironment("CC");
- getNumberOfVbuckets();
- schedule();
- }
-
- private void createEnvironment(String connectionName) {
- synchronized (TIME_UNIT) {
- if (core == null) {
- builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName)
- .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
- env = builder.build();
- core = new CouchbaseCore(env);
- }
- }
- }
-
- /*
- * We distribute the work of streaming vbuckets between all the partitions in a round robin
- * fashion.
- */
- private void schedule() {
- schedule = new int[numOfVBuckets];
- String[] locations = getPartitionConstraint().getLocations();
- for (int i = 0; i < numOfVBuckets; i++) {
- schedule[i] = i % locations.length;
- }
- }
-
- private void getNumberOfVbuckets() {
- core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
- core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
- numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
- .map(new Func1<GetClusterConfigResponse, Integer>() {
- @Override
- public Integer call(GetClusterConfigResponse response) {
- CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
- return config.numberOfPartitions();
- }
- }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
- core.send(new CloseBucketRequest(bucket)).toBlocking();
- }
-
- @Override
- public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- createEnvironment(nodeName);
- ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
- for (int i = 0; i < schedule.length; i++) {
- if (schedule[i] == partition) {
- listOfAssignedVBuckets.add((short) i);
- }
- }
- short[] vbuckets = new short[listOfAssignedVBuckets.size()];
- for (int i = 0; i < vbuckets.length; i++) {
- vbuckets[i] = listOfAssignedVBuckets.get(i);
- }
- return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets,
- ExternalDataUtils.getQueueSize(configuration), core);
- }
-
- @Override
- public Class<?> getRecordClass() {
- return DCPRequest.class;
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 1dd2fe8..f71e9a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -27,7 +27,6 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
-import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
@@ -91,8 +90,6 @@
return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
}
switch (reader) {
- case ExternalDataConstants.READER_KV:
- return new KVReaderFactory();
case ExternalDataConstants.READER_KV_TEST:
return new KVTestReaderFactory();
case ExternalDataConstants.READER_HDFS:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index cf2b7f3..81f8377 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -120,7 +120,6 @@
* Builtin record readers
*/
public static final String READER_HDFS = "hdfs";
- public static final String READER_KV = "key-value";
public static final String READER_TWITTER_PUSH = "twitter_push";
public static final String READER_PUSH_TWITTER = "push_twitter";
public static final String READER_TWITTER_PULL = "twitter_pull";