Merge branch 'gerrit/neo'
Change-Id: Id22e3b96e31924a90c2cc7c7dee2aa828ba18ac6
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 5bcde3d..1ec7405 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -45,6 +45,7 @@
import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.http.api.IChannelClosedHandler;
@@ -76,6 +77,7 @@
QueryServiceRequestParameters param, RequestExecutionState executionState,
Map<String, String> optionalParameters, Map<String, byte[]> statementParameters,
ResponsePrinter responsePrinter, List<Warning> warnings) throws Exception {
+ ensureOptionalParameters(optionalParameters);
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -134,6 +136,10 @@
buildResponseResults(responsePrinter, sessionOutput, responseMsg.getExecutionPlans(), warnings);
}
+ protected void ensureOptionalParameters(Map<String, String> optionalParameters) throws HyracksDataException {
+
+ }
+
protected ExecuteStatementRequestMessage createRequestMessage(IServletRequest request,
IRequestReference requestReference, String statementsText, SessionOutput sessionOutput,
ResultProperties resultProperties, QueryServiceRequestParameters param,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 8cf70b2..3d0f7fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -295,8 +295,7 @@
responseWriter.flush();
}
- protected boolean isRequestPermittedForWrite(IServletRequest request, IServletResponse response)
- throws IOException {
+ protected boolean isRequestPermitted(IServletRequest request, IServletResponse response) throws IOException {
if (!isRequestOnLoopback(request)) {
rejectForbidden(response);
return false;
@@ -322,14 +321,14 @@
@Override
protected void post(IServletRequest request, IServletResponse response) throws IOException {
- if (isRequestPermittedForWrite(request, response)) {
+ if (isRequestPermitted(request, response)) {
handleModification(request, response, LibraryOperation.UPSERT);
}
}
@Override
protected void delete(IServletRequest request, IServletResponse response) throws IOException {
- if (isRequestPermittedForWrite(request, response)) {
+ if (isRequestPermitted(request, response)) {
handleModification(request, response, LibraryOperation.DELETE);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 7b52df6..a159e09 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -57,14 +57,14 @@
* current replicas
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
- private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
+ private final Set<Integer> nodeOriginatedPartitions = new HashSet<>();
public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
this.appCtx = appCtx;
for (Integer partition : partitions) {
this.partitions.put(partition, new Object());
}
- setNodeOwnedPartitions(appCtx);
+ setNodeOriginatedPartitions(appCtx);
}
@Override
@@ -163,8 +163,8 @@
}
@Override
- public boolean isPartitionOwner(int partition) {
- return nodeOwnedPartitions.contains(partition);
+ public boolean isPartitionOrigin(int partition) {
+ return nodeOriginatedPartitions.contains(partition);
}
public void closePartitionResources(int partition) throws HyracksDataException {
@@ -185,9 +185,9 @@
return id.getNodeId().equals(nodeId);
}
- private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
+ private void setNodeOriginatedPartitions(INcApplicationContext appCtx) {
Set<Integer> nodePartitions =
appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId());
- nodeOwnedPartitions.addAll(nodePartitions);
+ nodeOriginatedPartitions.addAll(nodePartitions);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 7483a17..d15d9be 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -195,12 +195,14 @@
public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
indexes.put(resourceID, indexInfo);
partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
+ LOGGER.debug("registered reference to index {}", indexInfo);
}
public synchronized void removeIndex(long resourceID) {
IndexInfo info = indexes.remove(resourceID);
if (info != null) {
partitionIndexes.get(info.getPartition()).remove(info);
+ LOGGER.debug("removed reference to index {}", info);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index b094b6f..8b631c2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -55,4 +55,10 @@
public LocalResource getLocalResource() {
return localResource;
}
+
+ @Override
+ public String toString() {
+ return "IndexInfo{" + "index=" + index + ", datasetId=" + datasetId + ", resourceId=" + resourceId
+ + ", partition=" + partition + ", localResource=" + localResource + '}';
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index a4d56ce..14fc9d8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -104,10 +104,10 @@
List<IPartitionReplica> getReplicas();
/**
- * Returns true if {@code partition} is owned by this node, otherwise false.
+ * Returns true if {@code partition} is originated by this node, otherwise false.
*
* @param partition
- * @return true if the partition is owned by this node, otherwise false.
+ * @return true if the partition is originated by this node, otherwise false.
*/
- boolean isPartitionOwner(int partition);
+ boolean isPartitionOrigin(int partition);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index 1a5ba88..e85b9d4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -90,7 +90,7 @@
return new PartitionResourcesListResponse(partition, partitionReplicatedResources, resources, owner);
}
- public boolean isOwner() {
+ public boolean isOrigin() {
return owner;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 3ea252f..d9b3b0c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -59,7 +59,7 @@
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
- partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOwner(partition));
+ partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition));
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index 968f883..73fca9c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -55,7 +55,7 @@
final IIOManager ioManager = appCtx.getIoManager();
final ISocketChannel channel = replica.getChannel();
final FileReference filePath = ioManager.resolve(file);
- String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+ String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode);
LOGGER.debug("attempting to replicate {} to replica {}", task, replica);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 00e63ec..bfd3422 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -94,7 +94,7 @@
final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
// send mark component valid
- String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+ String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(),
getReplicatedComponentId(), masterNode);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 477559c..44c9404 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -72,7 +72,7 @@
}
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
- replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
+ replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin());
// clean up files for invalid resources (deleted or recreated while the replica was down)
Set<String> deletedReplicaFiles =
cleanupReplicaInvalidResources(replicaResourceResponse, resourceReferenceLongMap);
@@ -155,7 +155,7 @@
if (!validReplicaResources.containsKey(replicaRes)) {
LOGGER.debug("replica invalid file {} to be deleted", replicaRes.getFileRelativePath());
invalidFiles.add(replicaResPath);
- } else if (replicaResourceResponse.isOwner() && !replicaRes.isMetadataResource()) {
+ } else if (replicaResourceResponse.isOrigin() && !replicaRes.isMetadataResource()) {
// find files where the owner generated and failed before replicating
Long masterValidSeq = validReplicaResources.get(replicaRes);
IndexComponentFileReference componentFileReference =
@@ -184,7 +184,7 @@
}
private Map<ResourceReference, Long> getValidReplicaResources(Map<String, Long> partitionReplicatedResources,
- boolean owner) throws HyracksDataException {
+ boolean origin) throws HyracksDataException {
Map<ResourceReference, Long> resource2ValidSeqMap = new HashMap<>();
for (Map.Entry<String, Long> resourceEntry : partitionReplicatedResources.entrySet()) {
ResourceReference rr = ResourceReference.of(resourceEntry.getKey());
@@ -196,7 +196,7 @@
LOGGER.info("replica has resource {} but with different resource id; ours {}, theirs {}", rr,
localResource.getId(), resourceEntry.getValue());
} else {
- long resourceMasterValidSeq = owner ? getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
+ long resourceMasterValidSeq = origin ? getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
resource2ValidSeqMap.put(rr, resourceMasterValidSeq);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 2434686..a8eee4f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -73,7 +73,7 @@
private void checkpointReplicaIndexes() throws IOException {
final int partition = replica.getIdentifier().getPartition();
String masterNode =
- appCtx.getReplicaManager().isPartitionOwner(partition) ? appCtx.getServiceContext().getNodeId() : null;
+ appCtx.getReplicaManager().isPartitionOrigin(partition) ? appCtx.getServiceContext().getNodeId() : null;
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode);
ReplicationProtocol.sendTo(replica, task);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index e433c02..9f01123 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -236,7 +236,13 @@
try {
return readFuture.get();
} catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
- response.close();
+ executor.submit(() -> {
+ try {
+ response.close();
+ } catch (IOException e) {
+ LOGGER.debug("{} ignoring exception thrown on stream close due to interrupt", description, e);
+ }
+ });
try {
readFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException te) {