Update the Key Value Reader

Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Reviewed-on: https://asterix-gerrit.ics.uci.edu/805
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8c59cc4..f7ac6a9 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -287,12 +287,7 @@
         <dependency>
             <groupId>com.couchbase.client</groupId>
             <artifactId>core-io</artifactId>
-            <version>1.2.3</version>
-        </dependency>
-        <dependency>
-            <groupId>io.reactivex</groupId>
-            <artifactId>rxjava</artifactId>
-            <version>1.0.15</version>
+            <version>1.2.7</version>
         </dependency>
     </dependencies>
 </project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
index 379bbf2..724699c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
@@ -37,6 +37,7 @@
 import com.couchbase.client.core.message.dcp.MutationMessage;
 import com.couchbase.client.core.message.dcp.RemoveMessage;
 import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
 
 public class DCPRequestToRecordWithMetadataAndPKConverter
         implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
@@ -88,6 +89,7 @@
             recordWithMetadata.setMetadata(8, revSeqNumber);
             recordWithMetadata.setMetadata(9, lockTime);
             DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value);
+            ReferenceCountUtil.release(message.content());
         } else if (dcpRequest instanceof RemoveMessage) {
             final RemoveMessage message = (RemoveMessage) dcpRequest;
             final String key = message.key();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 185aea0..41bcc46 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -30,36 +30,30 @@
 import org.apache.log4j.Logger;
 
 import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.endpoint.dcp.DCPConnection;
 import com.couchbase.client.core.message.cluster.CloseBucketRequest;
 import com.couchbase.client.core.message.cluster.OpenBucketRequest;
 import com.couchbase.client.core.message.cluster.SeedNodesRequest;
 import com.couchbase.client.core.message.dcp.DCPRequest;
 import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
+import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
 import com.couchbase.client.core.message.dcp.RemoveMessage;
 import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
 
-import rx.functions.Action1;
+import rx.Subscriber;
 
 public class KVReader implements IRecordReader<DCPRequest> {
 
     private static final Logger LOGGER = Logger.getLogger(KVReader.class);
-    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
-            null);
+    private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0,
+            0L, null);
     private final String feedName;
     private final short[] vbuckets;
     private final String bucket;
     private final String password;
     private final String[] sourceNodes;
-    private final Builder builder;
-    private final BucketStreamAggregator bucketStreamAggregator;
     private final CouchbaseCore core;
-    private final DefaultCoreEnvironment env;
     private final GenericRecord<DCPRequest> record;
     private final ArrayBlockingQueue<DCPRequest> messages;
     private AbstractFeedDataFlowController controller;
@@ -67,20 +61,22 @@
     private boolean done = false;
 
     public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets,
-            int queueSize) throws HyracksDataException {
+            int queueSize, CouchbaseCore core) throws HyracksDataException {
         this.feedName = feedName;
         this.bucket = bucket;
         this.password = password;
         this.sourceNodes = sourceNodes;
         this.vbuckets = vbuckets;
         this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
-        this.builder = DefaultCoreEnvironment.builder().dcpEnabled(KVReaderFactory.DCP_ENABLED)
-                .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
-        this.env = builder.build();
-        this.core = new CouchbaseCore(env);
-        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+        this.core = core;
         this.record = new GenericRecord<>();
-        connect();
+        this.pushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KVReader.this.run();
+            }
+        }, feedName);
+        pushThread.start();
     }
 
     @Override
@@ -90,44 +86,33 @@
         }
     }
 
-    private void connect() {
+    private void run() {
         core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
                 .toBlocking().single();
         core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
                 .toBlocking().single();
-        this.pushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KVReader.this.run(bucketStreamAggregator);
-            }
-        }, feedName);
-        pushThread.start();
-    }
-
-    private void run(BucketStreamAggregator bucketStreamAggregator) {
-        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+        DCPConnection connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket))
+                .toBlocking().single().connection();
         for (int i = 0; i < vbuckets.length; i++) {
-            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+            connection.addStream(vbuckets[i]).toBlocking().single();
         }
-        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
-            @Override
-            public void call(BucketStreamStateUpdatedEvent event) {
-                if (event.partialUpdate()) {
-                } else {
-                }
-            }
-        });
         try {
-            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+            connection.subject().toBlocking().subscribe(new Subscriber<DCPRequest>() {
                 @Override
-                public void call(DCPRequest dcpRequest) {
+                public void onCompleted() {
+                }
+
+                @Override
+                public void onError(Throwable e) {
+                    e.printStackTrace();
+                }
+
+                @Override
+                public void onNext(DCPRequest dcpRequest) {
                     try {
                         if (dcpRequest instanceof SnapshotMarkerMessage) {
                             SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
-                            BucketStreamState oldState = state.get(message.partition());
-                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
-                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
-                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                            LOGGER.info("snapshot DCP message received: " + message);
                         } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) {
                             messages.put(dcpRequest);
                         } else {
@@ -139,13 +124,7 @@
                 }
             });
         } catch (Throwable th) {
-            if (th.getCause() instanceof InterruptedException) {
-                LOGGER.warn("dcp thread was interrupted", th);
-                synchronized (this) {
-                    KVReader.this.close();
-                    notifyAll();
-                }
-            }
+            th.printStackTrace();
             throw th;
         }
     }
@@ -172,6 +151,7 @@
     public boolean stop() {
         done = true;
         core.send(new CloseBucketRequest(bucket)).toBlocking();
+        pushThread.interrupt();
         try {
             messages.put(KVReader.POISON_PILL);
         } catch (InterruptedException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
index bc2a980..70d53f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.external.input.record.reader.kv;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -62,13 +65,19 @@
     private int[] schedule;
     private String feedName;
     // Transient fields
-    private transient CouchbaseCore core;
+    private static transient CouchbaseCore core;
     private transient Builder builder;
-    private transient DefaultCoreEnvironment env;
+    private static transient DefaultCoreEnvironment env;
+    private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+        if (locationConstraints == null) {
+            String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+            Set<String> ncs = new HashSet<String>(Arrays.asList(allPartitions));
+            locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+        }
+        return locationConstraints;
     }
 
     @Override
@@ -90,21 +99,29 @@
         bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
         couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
         feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
-        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
-                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
-        env = builder.build();
-        core = new CouchbaseCore(env);
+        createEnvironment("CC");
         getNumberOfVbuckets();
         schedule();
     }
 
+    private void createEnvironment(String connectionName) {
+        synchronized (TIME_UNIT) {
+            if (core == null) {
+                builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName)
+                        .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+                env = builder.build();
+                core = new CouchbaseCore(env);
+            }
+        }
+    }
+
     /*
      * We distribute the work of streaming vbuckets between all the partitions in a round robin
      * fashion.
      */
     private void schedule() {
         schedule = new int[numOfVBuckets];
-        String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+        String[] locations = getPartitionConstraint().getLocations();
         for (int i = 0; i < numOfVBuckets; i++) {
             schedule[i] = i % locations.length;
         }
@@ -128,6 +145,7 @@
     public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        createEnvironment(nodeName);
         ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
         for (int i = 0; i < schedule.length; i++) {
             if (schedule[i] == partition) {
@@ -138,8 +156,8 @@
         for (int i = 0; i < vbuckets.length; i++) {
             vbuckets[i] = listOfAssignedVBuckets.get(i);
         }
-        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes,
-                vbuckets, ExternalDataUtils.getQueueSize(configuration));
+        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets,
+                ExternalDataUtils.getQueueSize(configuration), core);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index b75f26c..f7fe77f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -80,7 +80,6 @@
             }
         }
         this.byteBuff = ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-        byteBuff.retain();
         this.record = new GenericRecord<DCPRequest>();
         this.counter = counterStart;
     }
@@ -132,7 +131,7 @@
             if (nextDeleteKey != null) {
                 final String key = nextDeleteKey;
                 nextDeleteKey = null;
-                return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket);
+                return new RemoveMessage(0, nextDeletePartition, key, cas++, seq++, 0L, bucket);
             }
         }
         generateNextDocument();
@@ -141,15 +140,16 @@
                 final String key = nextUpsertKey;
                 nextUpsertKey = null;
                 upsertCounter++;
-                return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++, seq++, 0, 0, lockTime++,
-                        cas++, bucket);
+                return new MutationMessage(byteBuff.readableBytes(), nextUpsertPartition, key, byteBuff, expiration++,
+                        seq++, 0, 0, lockTime++, cas++, bucket);
             }
         }
-        return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(), byteBuff, expiration++,
-                seq++, 0, 0, lockTime++, cas++, bucket);
+        return new MutationMessage(byteBuff.readableBytes(), assigned.get(counter % assigned.size()), generateKey(),
+                byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, bucket);
     }
 
     private void generateNextDocument() {
+        byteBuff.retain();
         // reset the string
         strBuilder.setLength(0);
         strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\""