Update the Key Value Reader
Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Reviewed-on: https://asterix-gerrit.ics.uci.edu/805
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8c59cc4..f7ac6a9 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -287,12 +287,7 @@
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>core-io</artifactId>
- <version>1.2.3</version>
- </dependency>
- <dependency>
- <groupId>io.reactivex</groupId>
- <artifactId>rxjava</artifactId>
- <version>1.0.15</version>
+ <version>1.2.7</version>
</dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
index 379bbf2..724699c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
@@ -37,6 +37,7 @@
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
public class DCPRequestToRecordWithMetadataAndPKConverter
implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
@@ -88,6 +89,7 @@
recordWithMetadata.setMetadata(8, revSeqNumber);
recordWithMetadata.setMetadata(9, lockTime);
DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value);
+ ReferenceCountUtil.release(message.content());
} else if (dcpRequest instanceof RemoveMessage) {
final RemoveMessage message = (RemoveMessage) dcpRequest;
final String key = message.key();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 185aea0..41bcc46 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -30,36 +30,30 @@
import org.apache.log4j.Logger;
import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.message.cluster.CloseBucketRequest;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
+import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
-import rx.functions.Action1;
+import rx.Subscriber;
public class KVReader implements IRecordReader<DCPRequest> {
private static final Logger LOGGER = Logger.getLogger(KVReader.class);
- private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
- null);
+ private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0,
+ 0L, null);
private final String feedName;
private final short[] vbuckets;
private final String bucket;
private final String password;
private final String[] sourceNodes;
- private final Builder builder;
- private final BucketStreamAggregator bucketStreamAggregator;
private final CouchbaseCore core;
- private final DefaultCoreEnvironment env;
private final GenericRecord<DCPRequest> record;
private final ArrayBlockingQueue<DCPRequest> messages;
private AbstractFeedDataFlowController controller;
@@ -67,20 +61,22 @@
private boolean done = false;
public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets,
- int queueSize) throws HyracksDataException {
+ int queueSize, CouchbaseCore core) throws HyracksDataException {
this.feedName = feedName;
this.bucket = bucket;
this.password = password;
this.sourceNodes = sourceNodes;
this.vbuckets = vbuckets;
this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
- this.builder = DefaultCoreEnvironment.builder().dcpEnabled(KVReaderFactory.DCP_ENABLED)
- .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
- this.env = builder.build();
- this.core = new CouchbaseCore(env);
- this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+ this.core = core;
this.record = new GenericRecord<>();
- connect();
+ this.pushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ KVReader.this.run();
+ }
+ }, feedName);
+ pushThread.start();
}
@Override
@@ -90,44 +86,33 @@
}
}
- private void connect() {
+ private void run() {
core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
.toBlocking().single();
core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
.toBlocking().single();
- this.pushThread = new Thread(new Runnable() {
- @Override
- public void run() {
- KVReader.this.run(bucketStreamAggregator);
- }
- }, feedName);
- pushThread.start();
- }
-
- private void run(BucketStreamAggregator bucketStreamAggregator) {
- BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+ DCPConnection connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket))
+ .toBlocking().single().connection();
for (int i = 0; i < vbuckets.length; i++) {
- state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+ connection.addStream(vbuckets[i]).toBlocking().single();
}
- state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
- @Override
- public void call(BucketStreamStateUpdatedEvent event) {
- if (event.partialUpdate()) {
- } else {
- }
- }
- });
try {
- bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+ connection.subject().toBlocking().subscribe(new Subscriber<DCPRequest>() {
@Override
- public void call(DCPRequest dcpRequest) {
+ public void onCompleted() {
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ e.printStackTrace();
+ }
+
+ @Override
+ public void onNext(DCPRequest dcpRequest) {
try {
if (dcpRequest instanceof SnapshotMarkerMessage) {
SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
- BucketStreamState oldState = state.get(message.partition());
- state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
- message.endSequenceNumber(), oldState.endSequenceNumber(),
- message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+ LOGGER.info("snapshot DCP message received: " + message);
} else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) {
messages.put(dcpRequest);
} else {
@@ -139,13 +124,7 @@
}
});
} catch (Throwable th) {
- if (th.getCause() instanceof InterruptedException) {
- LOGGER.warn("dcp thread was interrupted", th);
- synchronized (this) {
- KVReader.this.close();
- notifyAll();
- }
- }
+ th.printStackTrace();
throw th;
}
}
@@ -172,6 +151,7 @@
public boolean stop() {
done = true;
core.send(new CloseBucketRequest(bucket)).toBlocking();
+ pushThread.interrupt();
try {
messages.put(KVReader.POISON_PILL);
} catch (InterruptedException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
index bc2a980..70d53f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -19,7 +19,10 @@
package org.apache.asterix.external.input.record.reader.kv;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -62,13 +65,19 @@
private int[] schedule;
private String feedName;
// Transient fields
- private transient CouchbaseCore core;
+ private static transient CouchbaseCore core;
private transient Builder builder;
- private transient DefaultCoreEnvironment env;
+ private static transient DefaultCoreEnvironment env;
+ private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- return AsterixClusterProperties.INSTANCE.getClusterLocations();
+ if (locationConstraints == null) {
+ String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+ Set<String> ncs = new HashSet<String>(Arrays.asList(allPartitions));
+ locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+ }
+ return locationConstraints;
}
@Override
@@ -90,21 +99,29 @@
bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
- builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
- .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
- env = builder.build();
- core = new CouchbaseCore(env);
+ createEnvironment("CC");
getNumberOfVbuckets();
schedule();
}
+ private void createEnvironment(String connectionName) {
+ synchronized (TIME_UNIT) {
+ if (core == null) {
+ builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName)
+ .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+ env = builder.build();
+ core = new CouchbaseCore(env);
+ }
+ }
+ }
+
/*
* We distribute the work of streaming vbuckets between all the partitions in a round robin
* fashion.
*/
private void schedule() {
schedule = new int[numOfVBuckets];
- String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+ String[] locations = getPartitionConstraint().getLocations();
for (int i = 0; i < numOfVBuckets; i++) {
schedule[i] = i % locations.length;
}
@@ -128,6 +145,7 @@
public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ createEnvironment(nodeName);
ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
for (int i = 0; i < schedule.length; i++) {
if (schedule[i] == partition) {
@@ -138,8 +156,8 @@
for (int i = 0; i < vbuckets.length; i++) {
vbuckets[i] = listOfAssignedVBuckets.get(i);
}
- return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes,
- vbuckets, ExternalDataUtils.getQueueSize(configuration));
+ return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets,
+ ExternalDataUtils.getQueueSize(configuration), core);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index b75f26c..f7fe77f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -80,7 +80,6 @@
}
}
this.byteBuff = ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
- byteBuff.retain();
this.record = new GenericRecord<DCPRequest>();
this.counter = counterStart;
}
@@ -132,7 +131,7 @@
if (nextDeleteKey != null) {
final String key = nextDeleteKey;
nextDeleteKey = null;
- return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket);
+ return new RemoveMessage(0, nextDeletePartition, key, cas++, seq++, 0L, bucket);
}
}
generateNextDocument();
@@ -141,15 +140,16 @@
final String key = nextUpsertKey;
nextUpsertKey = null;
upsertCounter++;
- return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++, seq++, 0, 0, lockTime++,
- cas++, bucket);
+ return new MutationMessage(byteBuff.readableBytes(), nextUpsertPartition, key, byteBuff, expiration++,
+ seq++, 0, 0, lockTime++, cas++, bucket);
}
}
- return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(), byteBuff, expiration++,
- seq++, 0, 0, lockTime++, cas++, bucket);
+ return new MutationMessage(byteBuff.readableBytes(), assigned.get(counter % assigned.size()), generateKey(),
+ byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, bucket);
}
private void generateNextDocument() {
+ byteBuff.retain();
// reset the string
strBuilder.setLength(0);
strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\""