Merge commit 'ba5322099c4559d41ec5530099d0d994aaadb339' from release-0.9.4-pre-rc
Change-Id: Ied547be3c7fa92d94948e9e1cbf2ed720c351b47
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
index 6810e19..7e851bf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/result/ResultPrinterTest.java
@@ -76,7 +76,7 @@
try {
// ensure result is valid json and error will be returned and not results.
ResultExtractor.extract(IOUtils.toInputStream(resultStr, StandardCharsets.UTF_8));
- } catch (AsterixException e) {
+ } catch (Exception e) {
exceptionThrown = true;
Assert.assertTrue(e.getMessage().contains(expectedException.getMessage()));
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
new file mode 100644
index 0000000..25ab530
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PartitionManagerTest {
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void failedJobPartitionRequestTest() throws Exception {
+ final NodeControllerService nc1 = integrationUtil.ncs[0];
+ final NodeControllerService nc2 = integrationUtil.ncs[1];
+ final JobId failedJob = new JobId(-1);
+ nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
+ final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
+ final InetSocketAddress nc2Address =
+ new InetSocketAddress(localNetworkAddress.getAddress(), localNetworkAddress.getPort());
+ PartitionId id = new PartitionId(failedJob, new ConnectorDescriptorId(1), 0, 1);
+ NetworkInputChannel inputChannel = new NetworkInputChannel(nc1.getNetworkManager(), nc2Address, id, 1);
+ InputChannelFrameReader frameReader = new InputChannelFrameReader(inputChannel);
+ inputChannel.registerMonitor(frameReader);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ Thread reader = new Thread(() -> {
+ try {
+ failed.set(!frameReader.nextFrame(new FixedSizeFrame()));
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ });
+ reader.start();
+ final IHyracksCommonContext context = Mockito.mock(IHyracksCommonContext.class);
+ Mockito.when(context.getInitialFrameSize()).thenReturn(2000);
+ inputChannel.open(context);
+ reader.join(5000);
+ Assert.assertTrue(failed.get());
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index d93555d..43833a2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -173,14 +173,14 @@
return IOUtils.toInputStream(resultBuilder.toString(), StandardCharsets.UTF_8);
}
- private static void checkForErrors(ObjectNode result) throws AsterixException {
+ private static void checkForErrors(ObjectNode result) throws Exception {
final JsonNode errorsField = result.get(ResultField.ERRORS.getFieldName());
if (errorsField != null) {
final JsonNode errors = errorsField.get(0).get("msg");
if (!result.get(ResultField.METRICS.getFieldName()).has("errorCount")) {
- throw new AsterixException("Request reported error but not an errorCount");
+ throw new Exception("Request reported error but not an errorCount");
}
- throw new AsterixException(errors.asText());
+ throw new Exception(errors.asText());
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp
new file mode 100644
index 0000000..2812a2c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.ddl.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+ o_orderkey : bigint,
+ o_custkey : bigint,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : bigint,
+ o_comment : string
+};
+
+create dataset Orders(OrderType) primary key o_orderkey;
+
+create index OrdersIdx on Orders (o_custkey) type btree;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp
new file mode 100644
index 0000000..1a78363
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.2.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+SELECT ds.DatasetName as v1
+FROM Metadata.`Dataset` ds
+WHERE ds.DatasetName LIKE "Orders%"
+UNION ALL
+SELECT ds.DatasetName v1, idx.DatasetName v2, idx.IndexName v3
+FROM Metadata.`Index` idx, Metadata.`Dataset` ds
+WHERE ds.DatasetName LIKE "Orders%" and idx.DatasetName LIKE "Orders%"
+ORDER BY v1, v2, v3
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp
new file mode 100644
index 0000000..68d7f4a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-2355/query-ASTERIXDB-2355.1.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Invalid syntax */
+
+ %%%
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm
new file mode 100644
index 0000000..e2ba372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-2354/query-ASTERIXDB-2354.1.adm
@@ -0,0 +1,3 @@
+{ "v1": "Orders" }
+{ "v1": "Orders", "v2": "Orders", "v3": "Orders" }
+{ "v1": "Orders", "v2": "Orders", "v3": "OrdersIdx" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index eb90276..270d81a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -1896,7 +1896,7 @@
<test-case FilePath="dml">
<compilation-unit name="load-with-autogenerated-no-field">
<output-dir compare="Text">load-with-autogenerated-no-field</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: ASX1014: Field "not_id" is not found</expected-error>
+ <expected-error>ASX1014: Field "not_id" is not found</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="dml">
@@ -3721,6 +3721,17 @@
</compilation-unit>
</test-case>
<test-case FilePath="misc">
+ <compilation-unit name="query-ASTERIXDB-2354">
+ <output-dir compare="Text">query-ASTERIXDB-2354</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="query-ASTERIXDB-2355">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error><![CDATA[ASX1001: Syntax error: In line 22 >> %%%<< Encountered "%" at column 2.]]></expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="misc">
<compilation-unit name="unsupported_parameter">
<output-dir compare="Text">none</output-dir>
<expected-error>Query parameter compiler.joinmem is not supported</expected-error>
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index ff3b253..6405621 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -74,6 +74,7 @@
200 = External UDF cannot produce expected result. Please check the UDF configuration
# Compile-time check errors
+1001 = Syntax error: %1$s
1007 = Invalid expression: function %1$s expects its %2$s input parameter to be a %3$s expression, but the actual expression is %4$s
1008 = Invalid parameter number: function %1$s cannot take %2$s parameters
1010 = Phrase search in Full-text is not yet supported. Only one keyword per expression is permitted
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 5ca88a8..56c98bb 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -64,6 +64,7 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.functions.FunctionConstants;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
@@ -296,9 +297,9 @@
// this is here as the JavaCharStream that's below the lexer somtimes throws Errors that are not handled
// by the ANTLR-generated lexer or parser (e.g it does this for invalid backslash u + 4 hex digits escapes)
final String msg = e.getClass().getSimpleName() + (e.getMessage() != null ? ": " + e.getMessage() : "");
- throw new CompilationException(new ParseException(msg));
+ throw new CompilationException(ErrorCode.PARSE_ERROR, msg);
} catch (ParseException e) {
- throw new CompilationException("Syntax error: " + getMessage(e));
+ throw new CompilationException(ErrorCode.PARSE_ERROR, getMessage(e));
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 067579e..b6443c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -20,11 +20,15 @@
import java.util.Arrays;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint {
private final String[] locations;
public AlgebricksAbsolutePartitionConstraint(String[] locations) {
- this.locations = locations;
+ this.locations = locations.clone();
+ Arrays.sort(locations);
}
@Override
@@ -38,7 +42,26 @@
@Override
public String toString() {
- return Arrays.toString(locations);
+ return getPartitionConstraintType().toString() + ':' + Arrays.toString(locations);
}
+ @Override
+ public AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that) throws AlgebricksException {
+ switch (that.getPartitionConstraintType()) {
+ case COUNT:
+ AlgebricksCountPartitionConstraint thatCount = (AlgebricksCountPartitionConstraint) that;
+ if (locations.length <= thatCount.getCount()) {
+ return this;
+ }
+ break;
+ case ABSOLUTE:
+ AlgebricksAbsolutePartitionConstraint thatAbsolute = (AlgebricksAbsolutePartitionConstraint) that;
+ if (Arrays.equals(locations, thatAbsolute.locations)) {
+ return this;
+ }
+ break;
+ }
+
+ throw AlgebricksException.create(ErrorCode.CANNOT_COMPOSE_PART_CONSTRAINTS, toString(), that.toString());
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
index fbafdee..2fc4804 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksCountPartitionConstraint.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.algebricks.common.constraints;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
public class AlgebricksCountPartitionConstraint extends AlgebricksPartitionConstraint {
private final int count;
@@ -35,4 +38,21 @@
return count;
}
+ @Override
+ public String toString() {
+ return getPartitionConstraintType().toString() + ':' + count;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that) throws AlgebricksException {
+ switch (that.getPartitionConstraintType()) {
+ case COUNT:
+ AlgebricksCountPartitionConstraint thatCount = (AlgebricksCountPartitionConstraint) that;
+ return count <= thatCount.count ? this : that;
+ case ABSOLUTE:
+ return that.compose(this);
+ }
+
+ throw AlgebricksException.create(ErrorCode.CANNOT_COMPOSE_PART_CONSTRAINTS, toString(), that.toString());
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
index ada1390..71575d5 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksPartitionConstraint.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.algebricks.common.constraints;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
public abstract class AlgebricksPartitionConstraint {
public enum PartitionConstraintType {
ABSOLUTE,
@@ -25,4 +27,7 @@
}
public abstract PartitionConstraintType getPartitionConstraintType();
+
+ public abstract AlgebricksPartitionConstraint compose(AlgebricksPartitionConstraint that)
+ throws AlgebricksException;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 16992e7..526add1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -185,7 +185,8 @@
return resultOps;
}
- private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) {
+ private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints)
+ throws AlgebricksException {
List<OperatorDescriptorId> roots = jobSpec.getRoots();
setSpecifiedPartitionConstraints();
for (OperatorDescriptorId rootId : roots) {
@@ -243,8 +244,8 @@
}
private void setPartitionConstraintsBottomup(OperatorDescriptorId opId,
- Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp,
- boolean finalPass) {
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp, boolean finalPass)
+ throws AlgebricksException {
List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
AlgebricksPartitionConstraint opConstraint = null;
IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
@@ -260,10 +261,10 @@
if (constraint != null) {
switch (constraint) {
case ONE:
- opConstraint = countOneLocation;
+ opConstraint = composePartitionConstraints(opConstraint, countOneLocation);
break;
case SAME_COUNT:
- opConstraint = partitionConstraintMap.get(src);
+ opConstraint = composePartitionConstraints(opConstraint, partitionConstraintMap.get(src));
break;
}
}
@@ -439,4 +440,9 @@
}
return false;
}
+
+ private static AlgebricksPartitionConstraint composePartitionConstraints(AlgebricksPartitionConstraint pc1,
+ AlgebricksPartitionConstraint pc2) throws AlgebricksException {
+ return pc1 == null ? pc2 : pc2 == null ? pc1 : pc1.compose(pc2);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 0691005..51afac1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -151,6 +151,7 @@
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
+ public static final int CANNOT_COMPOSE_PART_CONSTRAINTS = 10001;
private static class Holder {
private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1846062..452d379 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -133,3 +133,4 @@
114 = Node (%1$s) is not active
10000 = The given rule collection %1$s is not an instance of the List class.
+10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 126d9a4..3016a7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
public class NetworkOutputChannel implements IFrameWriter {
@@ -43,7 +44,7 @@
public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
this.ccb = ccb;
this.nBuffers = nBuffers;
- emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ emptyStack = new ArrayDeque<>(nBuffers);
ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@@ -58,7 +59,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer destBuffer = null;
+ ByteBuffer destBuffer;
while (buffer.hasRemaining()) {
synchronized (this) {
while (true) {
@@ -76,6 +77,7 @@
try {
wait();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
}
}
@@ -94,7 +96,7 @@
@Override
public void fail() throws HyracksDataException {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
}
@Override
@@ -103,7 +105,7 @@
}
public void abort() {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
synchronized (NetworkOutputChannel.this) {
aborted = true;
NetworkOutputChannel.this.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@
}
public void notifyTasksCompleted(CcId ccId) throws Exception {
+ partitionManager.jobsCompleted(ccId);
application.onRegisterNode(ccId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@
LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
}
noc = new NetworkOutputChannel(ccb, nBuffers);
- try {
- partitionManager.registerPartitionRequest(pid, noc);
- } catch (HyracksException e) {
- e.printStackTrace();
- noc.abort();
- }
+ partitionManager.registerPartitionRequest(pid, noc);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
package org.apache.hyracks.control.nc.partitions;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.IPartition;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
public class PartitionManager {
private final NodeControllerService ncs;
@@ -52,11 +57,14 @@
private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
+ private final Cache<JobId, JobId> failedJobsCache;
+
public PartitionManager(NodeControllerService ncs) {
this.ncs = ncs;
this.availablePartitionMap = new HashMap<>();
this.deallocatableRegistry = new DefaultDeallocatableRegistry();
this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+ failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
}
public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@
return availablePartitionMap.get(pid).get(0);
}
- public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
- for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
- .hasNext();) {
- Map.Entry<PartitionId, List<IPartition>> e = i.next();
- PartitionId pid = e.getKey();
- if (jobId.equals(pid.getJobId())) {
- for (IPartition p : e.getValue()) {
- unregisteredPartitions.add(p);
- }
- i.remove();
- }
+ public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+ if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+ writer.abort();
}
- }
-
- public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
- throws HyracksException {
- try {
- List<IPartition> pList = availablePartitionMap.get(partitionId);
- if (pList != null && !pList.isEmpty()) {
- IPartition partition = pList.get(0);
- writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
- partition.writeTo(writer);
- if (!partition.isReusable()) {
- availablePartitionMap.remove(partitionId);
- }
- } else {
- //throw new HyracksException("Request for unknown partition " + partitionId);
- partitionRequests.put(partitionId, writer);
+ List<IPartition> pList = availablePartitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
+ writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ availablePartitionMap.remove(partitionId);
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ } else {
+ partitionRequests.put(partitionId, writer);
}
}
@@ -137,7 +128,25 @@
deallocatableRegistry.close();
}
- public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+ public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+ if (status == JobStatus.FAILURE) {
+ failedJobsCache.put(jobId, jobId);
+ }
+ final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+ final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+ if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+ ncs.getExecutor().execute(() -> {
+ jobPartitions.forEach(IDeallocatable::deallocate);
+ pendingRequests.forEach(NetworkOutputChannel::abort);
+ });
+ }
+ }
+
+ public synchronized void jobsCompleted(CcId ccId) {
+ failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+ }
+
+ private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
PartitionState state) throws HyracksDataException {
PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
desc.setState(state);
@@ -147,4 +156,36 @@
throw HyracksDataException.create(e);
}
}
-}
+
+ private List<IPartition> unregisterPartitions(JobId jobId) {
+ final List<IPartition> unregisteredPartitions = new ArrayList<>();
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+ .hasNext();) {
+ Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+ PartitionId pid = entry.getKey();
+ if (jobId.equals(pid.getJobId())) {
+ unregisteredPartitions.addAll(entry.getValue());
+ i.remove();
+ }
+ }
+ return unregisteredPartitions;
+ }
+
+ private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+ if (status != JobStatus.FAILURE) {
+ return Collections.emptyList();
+ }
+ final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+ final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+ partitionRequests.entrySet().iterator();
+ while (requestsIterator.hasNext()) {
+ final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+ final PartitionId partitionId = entry.getKey();
+ if (partitionId.getJobId().equals(jobId)) {
+ pendingRequests.add(entry.getValue());
+ requestsIterator.remove();
+ }
+ }
+ return pendingRequests;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
*/
package org.apache.hyracks.control.nc.work;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -53,23 +49,7 @@
LOGGER.info("Cleaning up after job: " + jobId);
}
ncs.removeJobParameterByteStore(jobId);
- final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
- ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
- ncs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- for (IPartition p : unregisteredPartitions) {
- try {
- // Put deallocate in a try block to make sure that every IPartition is de-allocated.
- p.deallocate();
- } catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, e.getMessage(), e);
- }
- }
- }
- }
- });
+ ncs.getPartitionManager().jobCompleted(jobId, status);;
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0b548f6..28c1a71 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,6 +31,7 @@
public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
+ public static final int REMOTE_WRITE_ERROR_CODE = 1;
private static final Logger LOGGER = LogManager.getLogger();
protected final IChannelControlBlock ccb;
protected final Queue<ByteBuffer> wiFullQueue;
@@ -135,7 +136,9 @@
return;
}
eos = true;
- adjustChannelWritability();
+ if (ecode != REMOTE_WRITE_ERROR_CODE) {
+ adjustChannelWritability();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 286320b..a7fa49e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,28 +68,7 @@
MultiplexedConnection(MuxDemux muxDemux) {
this.muxDemux = muxDemux;
- pendingWriteEventsCounter = new IEventCounter() {
- private int counter;
-
- @Override
- public synchronized void increment() {
- ++counter;
- if (counter == 1) {
- tcpConnection.enable(SelectionKey.OP_WRITE);
- }
- }
-
- @Override
- public synchronized void decrement() {
- --counter;
- if (counter == 0) {
- tcpConnection.disable(SelectionKey.OP_WRITE);
- }
- if (counter < 0) {
- throw new IllegalStateException();
- }
- }
- };
+ pendingWriteEventsCounter = new EventCounter();
cSet = new ChannelSet(this, pendingWriteEventsCounter);
readerState = new ReaderState();
writerState = new WriterState();
@@ -429,4 +409,32 @@
public IChannelInterfaceFactory getChannelInterfaceFactory() {
return muxDemux.getChannelInterfaceFactory();
}
+
+ @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+ private class EventCounter implements IEventCounter {
+ private int counter;
+
+ @Override
+ public synchronized void increment() {
+ if (!connectionFailure) {
+ ++counter;
+ if (counter == 1) {
+ tcpConnection.enable(SelectionKey.OP_WRITE);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void decrement() {
+ if (!connectionFailure) {
+ --counter;
+ if (counter == 0) {
+ tcpConnection.disable(SelectionKey.OP_WRITE);
+ }
+ if (counter < 0) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c12909c..c58cb86 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -111,7 +111,9 @@
@Override
public void connectionClosed(TCPConnection connection) {
synchronized (MuxDemux.this) {
- connectionMap.remove(connection.getRemoteAddress());
+ if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
+ connectionMap.remove(connection.getRemoteAddress());
+ }
}
}
}, nThreads);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index b0e2eed..ff4627a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -29,6 +29,11 @@
public class TCPConnection {
+ public enum ConnectionType {
+ INCOMING,
+ OUTGOING
+ }
+
private static final Logger LOGGER = LogManager.getLogger();
private final TCPEndpoint endpoint;
@@ -43,11 +48,15 @@
private Object attachment;
- public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+ private ConnectionType type;
+
+ public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+ ConnectionType type) {
this.endpoint = endpoint;
this.channel = channel;
this.key = key;
this.selector = selector;
+ this.type = type;
remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
@@ -102,6 +111,10 @@
}
}
+ public ConnectionType getType() {
+ return type;
+ }
+
@Override
public String toString() {
return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index affa59e..05e2175 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.net.protocols.tcp;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -160,7 +162,8 @@
for (SocketChannel channel : workingIncomingConnections) {
register(channel);
SelectionKey sKey = channel.register(selector, 0);
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
+ ConnectionType.INCOMING);
sKey.attach(connection);
synchronized (connectionListener) {
connectionListener.acceptedConnection(connection);
@@ -220,7 +223,8 @@
}
private void createConnection(SelectionKey key, SocketChannel channel) {
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+ TCPConnection connection =
+ new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
key.attach(connection);
key.interestOps(0);
synchronized (connectionListener) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 1209e17..3520e3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.btree.impls.BTree;
@@ -162,7 +163,16 @@
searchCallback = lsmInitialState.getSearchOperationCallback();
predicate = (RangePredicate) lsmInitialState.getSearchPredicate();
numBTrees = operationalComponents.size();
- if (btreeCursors == null || btreeCursors.length != numBTrees) {
+ if (btreeCursors != null && btreeCursors.length != numBTrees) {
+ Throwable failure = CleanupUtils.destroy(null, btreeCursors);
+ btreeCursors = null;
+ failure = CleanupUtils.destroy(failure, btreeAccessors);
+ btreeAccessors = null;
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ }
+ if (btreeCursors == null) {
// object creation: should be relatively low
btreeCursors = new ITreeIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
@@ -175,8 +185,13 @@
BTree btree = (BTree) component.getIndex();
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
- bloomFilters[i] = null;
+ if (bloomFilters[i] != null) {
+ destroyAndNullifyCursorAtIndex(i);
+ }
} else {
+ if (bloomFilters[i] == null) {
+ destroyAndNullifyCursorAtIndex(i);
+ }
bloomFilters[i] = ((LSMBTreeWithBloomFilterDiskComponent) component).getBloomFilter();
}
@@ -193,6 +208,18 @@
foundTuple = false;
}
+ private void destroyAndNullifyCursorAtIndex(int i) throws HyracksDataException {
+ // component at location i was a disk component before, and is now a memory component, or vise versa
+ bloomFilters[i] = null;
+ Throwable failure = CleanupUtils.destroy(null, btreeCursors[i]);
+ btreeCursors[i] = null;
+ failure = CleanupUtils.destroy(failure, btreeAccessors[i]);
+ btreeAccessors[i] = null;
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ }
+
@Override
public void doNext() throws HyracksDataException {
nextHasBeenCalled = true;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index bfda985..a675047 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -47,10 +47,9 @@
public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
private final ArrayTupleReference copyTuple;
private final RangePredicate reusablePred;
-
private ISearchOperationCallback searchCallback;
-
private BTreeAccessor[] btreeAccessors;
+ private boolean[] isMemoryComponent;
private ArrayTupleBuilder tupleBuilder;
private boolean canCallProceed = true;
private boolean resultOfSearchCallbackProceed = false;
@@ -340,15 +339,19 @@
// object creation: should be relatively low
rangeCursors = new IIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
+ isMemoryComponent = new boolean[numBTrees];
} else if (rangeCursors.length != numBTrees) {
// should destroy first
Throwable failure = CleanupUtils.destroy(null, btreeAccessors);
+ btreeAccessors = null;
failure = CleanupUtils.destroy(failure, rangeCursors);
+ rangeCursors = null;
if (failure != null) {
throw HyracksDataException.create(failure);
}
rangeCursors = new IIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
+ isMemoryComponent = new boolean[numBTrees];
}
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
@@ -357,7 +360,7 @@
includeMutableComponent = true;
}
btree = (BTree) component.getIndex();
- if (btreeAccessors[i] == null) {
+ if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
} else {
@@ -365,6 +368,7 @@
btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursors[i].close();
}
+ isMemoryComponent[i] = component.getType() == LSMComponentType.MEMORY;
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
try {
@@ -377,6 +381,22 @@
}
}
+ private boolean destroyIncompatible(ILSMComponent component, int index) throws HyracksDataException {
+ // exclusive or. if the component is memory and the previous one at that index was a disk component
+ // or vice versa, then we should destroy the cursor and accessor since they need to be recreated
+ if (component.getType() == LSMComponentType.MEMORY ^ isMemoryComponent[index]) {
+ Throwable failure = CleanupUtils.destroy(null, btreeAccessors[index]);
+ btreeAccessors[index] = null;
+ failure = CleanupUtils.destroy(failure, rangeCursors[index]);
+ rangeCursors[index] = null;
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ return true;
+ }
+ return false;
+ }
+
@Override
public boolean getSearchOperationCallbackProceedResult() {
return resultOfSearchCallbackProceed;