Merge commit 'ba5322099c4559d41ec5530099d0d994aaadb339' from release-0.9.4-pre-rc

Change-Id: Ied547be3c7fa92d94948e9e1cbf2ed720c351b47
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
index 6810e19..7e851bf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
@@ -76,7 +76,7 @@
         try {
             // ensure result is valid json and error will be returned and not results.
             ResultExtractor.extract(IOUtils.toInputStream(resultStr, StandardCharsets.UTF_8));
-        } catch (AsterixException e) {
+        } catch (Exception e) {
             exceptionThrown = true;
             Assert.assertTrue(e.getMessage().contains(expectedException.getMessage()));
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
new file mode 100644
index 0000000..25ab530
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PartitionManagerTest {
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void failedJobPartitionRequestTest() throws Exception {
+        final NodeControllerService nc1 = integrationUtil.ncs[0];
+        final NodeControllerService nc2 = integrationUtil.ncs[1];
+        final JobId failedJob = new JobId(-1);
+        nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
+        final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
+        final InetSocketAddress nc2Address =
+                new InetSocketAddress(localNetworkAddress.getAddress(), localNetworkAddress.getPort());
+        PartitionId id = new PartitionId(failedJob, new ConnectorDescriptorId(1), 0, 1);
+        NetworkInputChannel inputChannel = new NetworkInputChannel(nc1.getNetworkManager(), nc2Address, id, 1);
+        InputChannelFrameReader frameReader = new InputChannelFrameReader(inputChannel);
+        inputChannel.registerMonitor(frameReader);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        Thread reader = new Thread(() -> {
+            try {
+                failed.set(!frameReader.nextFrame(new FixedSizeFrame()));
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+        });
+        reader.start();
+        final IHyracksCommonContext context = Mockito.mock(IHyracksCommonContext.class);
+        Mockito.when(context.getInitialFrameSize()).thenReturn(2000);
+        inputChannel.open(context);
+        reader.join(5000);
+        Assert.assertTrue(failed.get());
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index d93555d..43833a2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -173,14 +173,14 @@
         return IOUtils.toInputStream(resultBuilder.toString(), StandardCharsets.UTF_8);
     }
 
-    private static void checkForErrors(ObjectNode result) throws AsterixException {
+    private static void checkForErrors(ObjectNode result) throws Exception {
         final JsonNode errorsField = result.get(ResultField.ERRORS.getFieldName());
         if (errorsField != null) {
             final JsonNode errors = errorsField.get(0).get("msg");
             if (!result.get(ResultField.METRICS.getFieldName()).has("errorCount")) {
-                throw new AsterixException("Request reported error but not an errorCount");
+                throw new Exception("Request reported error but not an errorCount");
             }
-            throw new AsterixException(errors.asText());
+            throw new Exception(errors.asText());
         }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp
new file mode 100644
index 0000000..2812a2c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+  o_orderkey : bigint,
+  o_custkey : bigint,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : bigint,
+  o_comment : string
+};
+
+create dataset Orders(OrderType) primary key o_orderkey;
+
+create index OrdersIdx on Orders (o_custkey) type btree;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp
new file mode 100644
index 0000000..1a78363
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+SELECT ds.DatasetName as v1
+FROM Metadata.`Dataset` ds
+WHERE ds.DatasetName LIKE "Orders%"
+UNION ALL
+SELECT ds.DatasetName v1, idx.DatasetName v2, idx.IndexName v3
+FROM Metadata.`Index` idx, Metadata.`Dataset` ds
+WHERE ds.DatasetName LIKE "Orders%" and idx.DatasetName LIKE "Orders%"
+ORDER BY v1, v2, v3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp
new file mode 100644
index 0000000..68d7f4a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Invalid syntax */
+
+ %%%
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm
new file mode 100644
index 0000000..e2ba372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm
@@ -0,0 +1,3 @@
+{ "v1": "Orders" }
+{ "v1": "Orders", "v2": "Orders", "v3": "Orders" }
+{ "v1": "Orders", "v2": "Orders", "v3": "OrdersIdx" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index eb90276..270d81a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -1896,7 +1896,7 @@
     <test-case FilePath="dml">
       <compilation-unit name="load-with-autogenerated-no-field">
         <output-dir compare="Text">load-with-autogenerated-no-field</output-dir>
-        <expected-error>org.apache.asterix.common.exceptions.AsterixException: ASX1014: Field "not_id" is not found</expected-error>
+        <expected-error>ASX1014: Field "not_id" is not found</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="dml">
@@ -3721,6 +3721,17 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="misc">
+      <compilation-unit name="query-ASTERIXDB-2354">
+        <output-dir compare="Text">query-ASTERIXDB-2354</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="query-ASTERIXDB-2355">
+        <output-dir compare="Text">none</output-dir>
+        <expected-error><![CDATA[ASX1001: Syntax error: In line 22 >> %%%<< Encountered "%" at column 2.]]></expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="misc">
       <compilation-unit name="unsupported_parameter">
         <output-dir compare="Text">none</output-dir>
         <expected-error>Query parameter compiler.joinmem is not supported</expected-error>
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index ff3b253..6405621 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -74,6 +74,7 @@
 200 = External UDF cannot produce expected result. Please check the UDF configuration
 
 # Compile-time check errors
+1001 = Syntax error: %1$s
 1007 = Invalid expression: function %1$s expects its %2$s input parameter to be a %3$s expression, but the actual expression is %4$s
 1008 = Invalid parameter number: function %1$s cannot take %2$s parameters
 1010 = Phrase search in Full-text is not yet supported. Only one keyword per expression is permitted
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 5ca88a8..56c98bb 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -64,6 +64,7 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.functions.FunctionConstants;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
@@ -296,9 +297,9 @@
             // this is here as the JavaCharStream that's below the lexer somtimes throws Errors that are not handled
             // by the ANTLR-generated lexer or parser (e.g it does this for invalid backslash u + 4 hex digits escapes)
             final String msg = e.getClass().getSimpleName() + (e.getMessage() != null ? ": " + e.getMessage() : "");
-            throw new CompilationException(new ParseException(msg));
+            throw new CompilationException(ErrorCode.PARSE_ERROR, msg);
         } catch (ParseException e) {
-            throw new CompilationException("Syntax error: " + getMessage(e));
+            throw new CompilationException(ErrorCode.PARSE_ERROR, getMessage(e));
         }
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 067579e..b6443c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -20,11 +20,15 @@
 
 import java.util.Arrays;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
 public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint {
     private final String[] locations;
 
     public AlgebricksAbsolutePartitionConstraint(String[] locations) {
-        this.locations = locations;
+        this.locations = locations.clone();
+        Arrays.sort(locations);
     }
 
     @Override
@@ -38,7 +42,26 @@
 
     @Override
     public String toString() {
-        return Arrays.toString(locations);
+        return getPartitionConstraintType().toString() + ':' + Arrays.toString(locations);
     }
 
+    @Override
+    public AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that) throws AlgebricksException {
+        switch (that.getPartitionConstraintType()) {
+            case COUNT:
+                AlgebricksCountPartitionConstraint thatCount = (AlgebricksCountPartitionConstraint) that;
+                if (locations.length <= thatCount.getCount()) {
+                    return this;
+                }
+                break;
+            case ABSOLUTE:
+                AlgebricksAbsolutePartitionConstraint thatAbsolute = (AlgebricksAbsolutePartitionConstraint) that;
+                if (Arrays.equals(locations, thatAbsolute.locations)) {
+                    return this;
+                }
+                break;
+        }
+
+        throw AlgebricksException.create(ErrorCode.CANNOT_COMPOSE_PART_CONSTRAINTS, toString(), that.toString());
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
index fbafdee..2fc4804 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hyracks.algebricks.common.constraints;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
 public class AlgebricksCountPartitionConstraint extends AlgebricksPartitionConstraint {
 
     private final int count;
@@ -35,4 +38,21 @@
         return count;
     }
 
+    @Override
+    public String toString() {
+        return getPartitionConstraintType().toString() + ':' + count;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that) throws AlgebricksException {
+        switch (that.getPartitionConstraintType()) {
+            case COUNT:
+                AlgebricksCountPartitionConstraint thatCount = (AlgebricksCountPartitionConstraint) that;
+                return count <= thatCount.count ? this : that;
+            case ABSOLUTE:
+                return that.compose(this);
+        }
+
+        throw AlgebricksException.create(ErrorCode.CANNOT_COMPOSE_PART_CONSTRAINTS, toString(), that.toString());
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
index ada1390..71575d5 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.common.constraints;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
 public abstract class AlgebricksPartitionConstraint {
     public enum PartitionConstraintType {
         ABSOLUTE,
@@ -25,4 +27,7 @@
     }
 
     public abstract PartitionConstraintType getPartitionConstraintType();
+
+    public abstract AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that)
+            throws AlgebricksException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 16992e7..526add1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -185,7 +185,8 @@
         return resultOps;
     }
 
-    private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) {
+    private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints)
+            throws AlgebricksException {
         List<OperatorDescriptorId> roots = jobSpec.getRoots();
         setSpecifiedPartitionConstraints();
         for (OperatorDescriptorId rootId : roots) {
@@ -243,8 +244,8 @@
     }
 
     private void setPartitionConstraintsBottomup(OperatorDescriptorId opId,
-            Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp,
-            boolean finalPass) {
+            Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp, boolean finalPass)
+            throws AlgebricksException {
         List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
         AlgebricksPartitionConstraint opConstraint = null;
         IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
@@ -260,10 +261,10 @@
                 if (constraint != null) {
                     switch (constraint) {
                         case ONE:
-                            opConstraint = countOneLocation;
+                            opConstraint = composePartitionConstraints(opConstraint, countOneLocation);
                             break;
                         case SAME_COUNT:
-                            opConstraint = partitionConstraintMap.get(src);
+                            opConstraint = composePartitionConstraints(opConstraint, partitionConstraintMap.get(src));
                             break;
                     }
                 }
@@ -439,4 +440,9 @@
         }
         return false;
     }
+
+    private static AlgebricksPartitionConstraint composePartitionConstraints(AlgebricksPartitionConstraint pc1,
+            AlgebricksPartitionConstraint pc2) throws AlgebricksException {
+        return pc1 == null ? pc2 : pc2 == null ? pc1 : pc1.compose(pc2);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 0691005..51afac1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -151,6 +151,7 @@
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
+    public static final int CANNOT_COMPOSE_PART_CONSTRAINTS = 10001;
 
     private static class Holder {
         private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1846062..452d379 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -133,3 +133,4 @@
 114 = Node (%1$s) is not active
 
 10000 = The given rule collection %1$s is not an instance of the List class.
+10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 126d9a4..3016a7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {
@@ -43,7 +44,7 @@
     public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
         this.ccb = ccb;
         this.nBuffers = nBuffers;
-        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        emptyStack = new ArrayDeque<>(nBuffers);
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
@@ -58,7 +59,7 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer destBuffer = null;
+        ByteBuffer destBuffer;
         while (buffer.hasRemaining()) {
             synchronized (this) {
                 while (true) {
@@ -76,6 +77,7 @@
                     try {
                         wait();
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         throw HyracksDataException.create(e);
                     }
                 }
@@ -94,7 +96,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
     }
 
     @Override
@@ -103,7 +105,7 @@
     }
 
     public void abort() {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
         synchronized (NetworkOutputChannel.this) {
             aborted = true;
             NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@
     }
 
     public void notifyTasksCompleted(CcId ccId) throws Exception {
+        partitionManager.jobsCompleted(ccId);
         application.onRegisterNode(ccId);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@
                 LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
             }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            try {
-                partitionManager.registerPartitionRequest(pid, noc);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-                noc.abort();
-            }
+            partitionManager.registerPartitionRequest(pid, noc);
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
 package org.apache.hyracks.control.nc.partitions;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 public class PartitionManager {
 
     private final NodeControllerService ncs;
@@ -52,11 +57,14 @@
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Cache<JobId, JobId> failedJobsCache;
+
     public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
         this.availablePartitionMap = new HashMap<>();
         this.deallocatableRegistry = new DefaultDeallocatableRegistry();
         this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+        failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
     }
 
     public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@
         return availablePartitionMap.get(pid).get(0);
     }
 
-    public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
-        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
-                .hasNext();) {
-            Map.Entry<PartitionId, List<IPartition>> e = i.next();
-            PartitionId pid = e.getKey();
-            if (jobId.equals(pid.getJobId())) {
-                for (IPartition p : e.getValue()) {
-                    unregisteredPartitions.add(p);
-                }
-                i.remove();
-            }
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+        if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+            writer.abort();
         }
-    }
-
-    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
-            throws HyracksException {
-        try {
-            List<IPartition> pList = availablePartitionMap.get(partitionId);
-            if (pList != null && !pList.isEmpty()) {
-                IPartition partition = pList.get(0);
-                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
-                partition.writeTo(writer);
-                if (!partition.isReusable()) {
-                    availablePartitionMap.remove(partitionId);
-                }
-            } else {
-                //throw new HyracksException("Request for unknown partition " + partitionId);
-                partitionRequests.put(partitionId, writer);
+        List<IPartition> pList = availablePartitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
+            writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+            partition.writeTo(writer);
+            if (!partition.isReusable()) {
+                availablePartitionMap.remove(partitionId);
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        } else {
+            partitionRequests.put(partitionId, writer);
         }
     }
 
@@ -137,7 +128,25 @@
         deallocatableRegistry.close();
     }
 
-    public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+    public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+        if (status == JobStatus.FAILURE) {
+            failedJobsCache.put(jobId, jobId);
+        }
+        final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+        final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+        if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+            ncs.getExecutor().execute(() -> {
+                jobPartitions.forEach(IDeallocatable::deallocate);
+                pendingRequests.forEach(NetworkOutputChannel::abort);
+            });
+        }
+    }
+
+    public synchronized void jobsCompleted(CcId ccId) {
+        failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+    }
+
+    private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
             PartitionState state) throws HyracksDataException {
         PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
         desc.setState(state);
@@ -147,4 +156,36 @@
             throw HyracksDataException.create(e);
         }
     }
-}
+
+    private List<IPartition> unregisterPartitions(JobId jobId) {
+        final List<IPartition> unregisteredPartitions = new ArrayList<>();
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+                .hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+            PartitionId pid = entry.getKey();
+            if (jobId.equals(pid.getJobId())) {
+                unregisteredPartitions.addAll(entry.getValue());
+                i.remove();
+            }
+        }
+        return unregisteredPartitions;
+    }
+
+    private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        if (status != JobStatus.FAILURE) {
+            return Collections.emptyList();
+        }
+        final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+        final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+                partitionRequests.entrySet().iterator();
+        while (requestsIterator.hasNext()) {
+            final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+            final PartitionId partitionId = entry.getKey();
+            if (partitionId.getJobId().equals(jobId)) {
+                pendingRequests.add(entry.getValue());
+                requestsIterator.remove();
+            }
+        }
+        return pendingRequests;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -53,23 +49,7 @@
             LOGGER.info("Cleaning up after job: " + jobId);
         }
         ncs.removeJobParameterByteStore(jobId);
-        final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
-        ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                for (IPartition p : unregisteredPartitions) {
-                    try {
-                        // Put deallocate in a try block to make sure that every IPartition is de-allocated.
-                        p.deallocate();
-                    } catch (Exception e) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.log(Level.WARN, e.getMessage(), e);
-                        }
-                    }
-                }
-            }
-        });
+        ncs.getPartitionManager().jobCompleted(jobId, status);;
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0b548f6..28c1a71 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,6 +31,7 @@
 
 public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
 
+    public static final int REMOTE_WRITE_ERROR_CODE = 1;
     private static final Logger LOGGER = LogManager.getLogger();
     protected final IChannelControlBlock ccb;
     protected final Queue<ByteBuffer> wiFullQueue;
@@ -135,7 +136,9 @@
                     return;
                 }
                 eos = true;
-                adjustChannelWritability();
+                if (ecode != REMOTE_WRITE_ERROR_CODE) {
+                    adjustChannelWritability();
+                }
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 286320b..a7fa49e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -67,28 +68,7 @@
 
     MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
-        pendingWriteEventsCounter = new IEventCounter() {
-            private int counter;
-
-            @Override
-            public synchronized void increment() {
-                ++counter;
-                if (counter == 1) {
-                    tcpConnection.enable(SelectionKey.OP_WRITE);
-                }
-            }
-
-            @Override
-            public synchronized void decrement() {
-                --counter;
-                if (counter == 0) {
-                    tcpConnection.disable(SelectionKey.OP_WRITE);
-                }
-                if (counter < 0) {
-                    throw new IllegalStateException();
-                }
-            }
-        };
+        pendingWriteEventsCounter = new EventCounter();
         cSet = new ChannelSet(this, pendingWriteEventsCounter);
         readerState = new ReaderState();
         writerState = new WriterState();
@@ -429,4 +409,32 @@
     public IChannelInterfaceFactory getChannelInterfaceFactory() {
         return muxDemux.getChannelInterfaceFactory();
     }
+
+    @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+    private class EventCounter implements IEventCounter {
+        private int counter;
+
+        @Override
+        public synchronized void increment() {
+            if (!connectionFailure) {
+                ++counter;
+                if (counter == 1) {
+                    tcpConnection.enable(SelectionKey.OP_WRITE);
+                }
+            }
+        }
+
+        @Override
+        public synchronized void decrement() {
+            if (!connectionFailure) {
+                --counter;
+                if (counter == 0) {
+                    tcpConnection.disable(SelectionKey.OP_WRITE);
+                }
+                if (counter < 0) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c12909c..c58cb86 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -111,7 +111,9 @@
             @Override
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
-                    connectionMap.remove(connection.getRemoteAddress());
+                    if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
+                        connectionMap.remove(connection.getRemoteAddress());
+                    }
                 }
             }
         }, nThreads);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index b0e2eed..ff4627a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -29,6 +29,11 @@
 
 public class TCPConnection {
 
+    public enum ConnectionType {
+        INCOMING,
+        OUTGOING
+    }
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TCPEndpoint endpoint;
@@ -43,11 +48,15 @@
 
     private Object attachment;
 
-    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+    private ConnectionType type;
+
+    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+            ConnectionType type) {
         this.endpoint = endpoint;
         this.channel = channel;
         this.key = key;
         this.selector = selector;
+        this.type = type;
         remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
     }
 
@@ -102,6 +111,10 @@
         }
     }
 
+    public ConnectionType getType() {
+        return type;
+    }
+
     @Override
     public String toString() {
         return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index affa59e..05e2175 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.net.protocols.tcp;
 
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -160,7 +162,8 @@
                         for (SocketChannel channel : workingIncomingConnections) {
                             register(channel);
                             SelectionKey sKey = channel.register(selector, 0);
-                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
+                                    ConnectionType.INCOMING);
                             sKey.attach(connection);
                             synchronized (connectionListener) {
                                 connectionListener.acceptedConnection(connection);
@@ -220,7 +223,8 @@
         }
 
         private void createConnection(SelectionKey key, SocketChannel channel) {
-            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+            TCPConnection connection =
+                    new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
             key.attach(connection);
             key.interestOps(0);
             synchronized (connectionListener) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 1209e17..3520e3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
@@ -162,7 +163,16 @@
         searchCallback = lsmInitialState.getSearchOperationCallback();
         predicate = (RangePredicate) lsmInitialState.getSearchPredicate();
         numBTrees = operationalComponents.size();
-        if (btreeCursors == null || btreeCursors.length != numBTrees) {
+        if (btreeCursors != null && btreeCursors.length != numBTrees) {
+            Throwable failure = CleanupUtils.destroy(null, btreeCursors);
+            btreeCursors = null;
+            failure = CleanupUtils.destroy(failure, btreeAccessors);
+            btreeAccessors = null;
+            if (failure != null) {
+                throw HyracksDataException.create(failure);
+            }
+        }
+        if (btreeCursors == null) {
             // object creation: should be relatively low
             btreeCursors = new ITreeIndexCursor[numBTrees];
             btreeAccessors = new BTreeAccessor[numBTrees];
@@ -175,8 +185,13 @@
             BTree btree = (BTree) component.getIndex();
             if (component.getType() == LSMComponentType.MEMORY) {
                 includeMutableComponent = true;
-                bloomFilters[i] = null;
+                if (bloomFilters[i] != null) {
+                    destroyAndNullifyCursorAtIndex(i);
+                }
             } else {
+                if (bloomFilters[i] == null) {
+                    destroyAndNullifyCursorAtIndex(i);
+                }
                 bloomFilters[i] = ((LSMBTreeWithBloomFilterDiskComponent) component).getBloomFilter();
             }
 
@@ -193,6 +208,18 @@
         foundTuple = false;
     }
 
+    private void destroyAndNullifyCursorAtIndex(int i) throws HyracksDataException {
+        // component at location i was a disk component before, and is now a memory component, or vise versa
+        bloomFilters[i] = null;
+        Throwable failure = CleanupUtils.destroy(null, btreeCursors[i]);
+        btreeCursors[i] = null;
+        failure = CleanupUtils.destroy(failure, btreeAccessors[i]);
+        btreeAccessors[i] = null;
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
+    }
+
     @Override
     public void doNext() throws HyracksDataException {
         nextHasBeenCalled = true;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index bfda985..a675047 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -47,10 +47,9 @@
 public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
     private final ArrayTupleReference copyTuple;
     private final RangePredicate reusablePred;
-
     private ISearchOperationCallback searchCallback;
-
     private BTreeAccessor[] btreeAccessors;
+    private boolean[] isMemoryComponent;
     private ArrayTupleBuilder tupleBuilder;
     private boolean canCallProceed = true;
     private boolean resultOfSearchCallbackProceed = false;
@@ -340,15 +339,19 @@
             // object creation: should be relatively low
             rangeCursors = new IIndexCursor[numBTrees];
             btreeAccessors = new BTreeAccessor[numBTrees];
+            isMemoryComponent = new boolean[numBTrees];
         } else if (rangeCursors.length != numBTrees) {
             // should destroy first
             Throwable failure = CleanupUtils.destroy(null, btreeAccessors);
+            btreeAccessors = null;
             failure = CleanupUtils.destroy(failure, rangeCursors);
+            rangeCursors = null;
             if (failure != null) {
                 throw HyracksDataException.create(failure);
             }
             rangeCursors = new IIndexCursor[numBTrees];
             btreeAccessors = new BTreeAccessor[numBTrees];
+            isMemoryComponent = new boolean[numBTrees];
         }
         for (int i = 0; i < numBTrees; i++) {
             ILSMComponent component = operationalComponents.get(i);
@@ -357,7 +360,7 @@
                 includeMutableComponent = true;
             }
             btree = (BTree) component.getIndex();
-            if (btreeAccessors[i] == null) {
+            if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
                 btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                 rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
             } else {
@@ -365,6 +368,7 @@
                 btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 rangeCursors[i].close();
             }
+            isMemoryComponent[i] = component.getType() == LSMComponentType.MEMORY;
         }
         IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
         try {
@@ -377,6 +381,22 @@
         }
     }
 
+    private boolean destroyIncompatible(ILSMComponent component, int index) throws HyracksDataException {
+        // exclusive or. if the component is memory and the previous one at that index was a disk component
+        // or vice versa, then we should destroy the cursor and accessor since they need to be recreated
+        if (component.getType() == LSMComponentType.MEMORY ^ isMemoryComponent[index]) {
+            Throwable failure = CleanupUtils.destroy(null, btreeAccessors[index]);
+            btreeAccessors[index] = null;
+            failure = CleanupUtils.destroy(failure, rangeCursors[index]);
+            rangeCursors[index] = null;
+            if (failure != null) {
+                throw HyracksDataException.create(failure);
+            }
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public boolean getSearchOperationCallbackProceedResult() {
         return resultOfSearchCallbackProceed;