Merged r432 from trunk
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@433 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index a431939..0987bd2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -16,6 +16,7 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -25,4 +26,6 @@
public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+
+ public ICCContext getCCContext();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..266ebe9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ICCContext {
+ public Map<String, Set<String>> getIPAddressNodeMap();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 1664531..ec8ab76 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -20,6 +20,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -91,7 +92,8 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan);
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx);
/**
* Indicate which consumer partitions may receive data from the given producer partition.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 3731459..78847eb 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -73,7 +74,8 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan);
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx);
/**
* Translates this operator descriptor to JSON.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+ public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+ public INullWriter createNullWriter();
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+ public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+ throws HyracksDataException;
+
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..26cb525
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+
+ public ITuplePairComparator createTuplePairComparator();
+}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 4210a23..d0561ea 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,6 +24,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -35,6 +36,7 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -67,6 +69,7 @@
import edu.uci.ics.hyracks.control.common.base.CCConfig;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.base.NCConfig;
import edu.uci.ics.hyracks.control.common.base.NodeParameters;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -82,6 +85,8 @@
private final Map<String, NodeControllerState> nodeRegistry;
+ private final Map<String, Set<String>> ipAddressNodeNameMap;
+
private final Map<String, CCApplicationContext> applications;
private final ServerContext serverCtx;
@@ -102,9 +107,12 @@
private final CCClientInterface ccci;
+ private final ICCContext ccContext;
+
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ ipAddressNodeNameMap = new HashMap<String, Set<String>>();
applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
ClusterControllerService.class.getName()));
@@ -115,6 +123,12 @@
scheduler = new DefaultJobScheduler(this);
this.timer = new Timer(true);
ccci = new CCClientInterface(this);
+ ccContext = new ICCContext() {
+ @Override
+ public Map<String, Set<String>> getIPAddressNodeMap() {
+ return ipAddressNodeNameMap;
+ }
+ };
}
@Override
@@ -162,6 +176,10 @@
return nodeRegistry;
}
+ public Map<String, Set<String>> getIPAddressNodeNameMap() {
+ return ipAddressNodeNameMap;
+ }
+
public CCConfig getConfig() {
return ccConfig;
}
@@ -178,7 +196,8 @@
@Override
public NodeParameters registerNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
- NodeControllerState state = new NodeControllerState(nodeController);
+ NCConfig ncConfig = nodeController.getConfiguration();
+ NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -249,7 +268,7 @@
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+ CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
applications.put(appName, appCtx);
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e614bc5..933504e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -1,14 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.control.cc;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.base.NCConfig;
public class NodeControllerState {
private final INodeController nodeController;
+ private final NCConfig ncConfig;
+
+ private final Set<UUID> activeJobIds;
+
private int lastHeartbeatDuration;
- public NodeControllerState(INodeController nodeController) {
+ public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
this.nodeController = nodeController;
+ this.ncConfig = ncConfig;
+ activeJobIds = new HashSet<UUID>();
}
public void notifyHeartbeat() {
@@ -26,4 +51,12 @@
public INodeController getNodeController() {
return nodeController;
}
+
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
+
+ public Set<UUID> getActiveJobIds() {
+ return activeJobIds;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..e375aad 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.control.cc.application;
import java.io.IOException;
@@ -8,6 +22,7 @@
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +32,15 @@
import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+ private final ICCContext ccContext;
+
private IJobSpecificationFactory jobSpecFactory;
private List<IJobLifecycleListener> jobLifecycleListeners;
- public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
super(serverCtx, appName);
+ this.ccContext = ccContext;
jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
}
@@ -33,6 +51,10 @@
bootstrap.start();
}
+ public ICCContext getCCContext() {
+ return ccContext;
+ }
+
@Override
public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 6316b3e..5f9377b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
throw new Exception("Node with this name already registered.");
}
nodeMap.put(nodeId, state);
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+ if (nodes == null) {
+ nodes = new HashSet<String>();
+ ipAddressNodeNameMap.put(ipAddress, nodes);
+ }
+ nodes.add(nodeId);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index b64a784..6e000cc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,9 +44,17 @@
LOGGER.info(e.getKey() + " considered dead");
}
}
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
// Deal with dead tasks.
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+ if (ipNodes != null) {
+ if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+ ipAddressNodeNameMap.remove(ipAddress);
+ }
+ }
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index a15469a..12215ff 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -25,6 +25,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -230,6 +231,7 @@
try {
solver = new PartitionConstraintSolver();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
+ final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
JobSpecification spec = jag.getJobSpecification();
final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@@ -241,13 +243,13 @@
PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, jag);
+ op.contributeSchedulingConstraints(acceptor, jag, appCtx);
}
});
PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
@Override
public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, jag);
+ conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
}
});
contributedConstraints.addAll(spec.getUserConstraints());
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c4c396f..eac578e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,7 +24,6 @@
import java.rmi.registry.Registry;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -133,7 +132,7 @@
partitionManager = new PartitionManager(this, connectionManager.getNetworkAddress());
connectionManager.setPartitionRequestListener(partitionManager);
- jobletMap = new HashMap<UUID, Joblet>();
+ jobletMap = new Hashtable<UUID, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
@@ -256,26 +255,25 @@
LOGGER.info("Initializing " + taId + " -> " + han);
}
final int partition = tid.getPartition();
- Task task = new Task(joblet, taId, han.getClass().getName());
+ Task task = new Task(joblet, taId, han.getClass().getName(), executor);
IOperatorNodePushable operator = han.createPushRuntime(task,
joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
partition, td.getPartitionCount());
- IPartitionCollector collector = null;
+ List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
- if (i >= 1) {
- throw new HyracksException("Multiple inputs to an activity not currently supported");
- }
IConnectorDescriptor conn = inputs.get(i);
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- collector = createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
+ IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+ recordDesc, cPolicy);
+ collectors.add(collector);
}
}
List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
@@ -297,7 +295,7 @@
}
}
- task.setTaskRuntime(collector, operator);
+ task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
joblet.addTask(task);
task.start();
@@ -355,7 +353,7 @@
}
@Override
- public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(UUID jobId) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index ac517cf..d5ce485 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -17,6 +17,8 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -46,29 +48,32 @@
private final String displayName;
+ private final Executor executor;
+
private final IWorkspaceFileFactory fileFactory;
private final DefaultDeallocatableRegistry deallocatableRegistry;
private final Map<String, Counter> counterMap;
- private IPartitionCollector collector;
+ private IPartitionCollector[] collectors;
private IOperatorNodePushable operator;
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName) {
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
+ this.executor = executor;
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
}
- public void setTaskRuntime(IPartitionCollector collector, IOperatorNodePushable operator) {
- this.collector = collector;
+ public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
+ this.collectors = collectors;
this.operator = operator;
}
@@ -145,8 +150,8 @@
public void abort() {
aborted = true;
- if (collector != null) {
- collector.abort();
+ for (IPartitionCollector c : collectors) {
+ c.abort();
}
}
@@ -158,36 +163,27 @@
ct.setName(displayName + ": " + taskAttemptId);
operator.initialize();
try {
- if (collector != null) {
- if (aborted) {
- return;
- }
- collector.open();
- try {
- joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
- IFrameReader reader = collector.getReader();
- reader.open();
- try {
- IFrameWriter writer = operator.getInputFrameWriter(0);
- writer.open();
- try {
- ByteBuffer buffer = allocateFrame();
- while (reader.nextFrame(buffer)) {
- if (aborted) {
- return;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ if (collectors.length > 0) {
+ final Semaphore sem = new Semaphore(collectors.length - 1);
+ for (int i = 1; i < collectors.length; ++i) {
+ final IPartitionCollector collector = collectors[i];
+ final IFrameWriter writer = operator.getInputFrameWriter(i);
+ sem.acquire();
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ pushFrames(collector, writer);
+ } catch (HyracksDataException e) {
+ } finally {
+ sem.release();
}
- } finally {
- writer.close();
}
- } finally {
- reader.close();
- }
+ });
+ }
+ try {
+ pushFrames(collectors[0], operator.getInputFrameWriter(0));
} finally {
- collector.close();
+ sem.acquire(collectors.length - 1);
}
}
} finally {
@@ -202,4 +198,42 @@
close();
}
}
+
+ private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+ if (aborted) {
+ return;
+ }
+ try {
+ collector.open();
+ try {
+ joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
+ IFrameReader reader = collector.getReader();
+ reader.open();
+ try {
+ writer.open();
+ try {
+ ByteBuffer buffer = allocateFrame();
+ while (reader.nextFrame(buffer)) {
+ if (aborted) {
+ return;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ reader.close();
+ }
+ } finally {
+ collector.close();
+ }
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
return false;
}
- public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
- int length) {
+ public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+ int dataLen1) {
int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
int length0 = endOffset0 - startOffset0;
- if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ int slotsLen1 = fieldSlots1.length * 4;
+ int length1 = slotsLen1 + dataLen1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
ByteBuffer src0 = accessor0.getBuffer();
int slotsLen0 = accessor0.getFieldSlotsLength();
int dataLen0 = length0 - slotsLen0;
// Copy slots from accessor0 verbatim
System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
- // Copy slots from fieldSlots with the following transformation:
- // newSlotIdx = oldSlotIdx + dataLen0
- for (int i = 0; i < fieldSlots.length; ++i) {
- buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+ // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < fieldSlots1.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
}
// Copy data0
System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
- + fieldSlots.length * 4, dataLen0);
- // Copy bytes
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
- + dataLen0, length);
- tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+ + slotsLen1, dataLen0);
+ // Copy bytes1
+ System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+ + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
return false;
}
+ public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+ int tIndex1) {
+ int slotsLen0 = fieldSlots0.length * 4;
+ int length0 = slotsLen0 + dataLen0;
+
+ int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+ int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+ int length1 = endOffset1 - startOffset1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ ByteBuffer src1 = accessor1.getBuffer();
+ int slotsLen1 = accessor1.getFieldSlotsLength();
+ int dataLen1 = length1 - slotsLen1;
+ // Copy fieldSlots0 verbatim
+ for (int i = 0; i < fieldSlots0.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+ }
+ // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < slotsLen1 / 4; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+ }
+ // Copy bytes0
+ System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1,
+ dataLen0);
+ // Copy data1
+ System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+ + slotsLen1 + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
int fTargetSlotsLength = fields.length * 4;
int length = fTargetSlotsLength;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 19965b9..76cb1f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 8b16068..97ca255 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 556466f..67130c9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -16,6 +16,7 @@
import java.util.BitSet;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -55,7 +56,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx) {
JobSpecification jobSpec = plan.getJobSpecification();
IOperatorDescriptor consumer = jobSpec.getConsumer(this);
IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 27ae2db..c60c0af 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String RELATION0 = "Rel0";
+ private static final String RELATION1 = "Rel1";
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -56,6 +58,8 @@
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,15 +73,33 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION, keys0,
- 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), LARGERELATION, keys1,
- 1);
+ HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), RELATION0, keys0, 0);
+ HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), RELATION1, keys1, 1);
JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
builder.addActivity(rpart);
@@ -217,18 +239,24 @@
}
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void initialize() throws HyracksDataException {
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
- sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+ buildWriters = (RunFileWriter[]) env.get(RELATION1);
+ probeWriters = (RunFileWriter[]) env.get(RELATION0);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +269,36 @@
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators));
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
// build
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
- buffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(buffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
writer.close();
@@ -276,8 +306,8 @@
@Override
public void deinitialize() throws HyracksDataException {
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(RELATION1, null);
+ env.set(RELATION0, null);
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index df7440b..7c9134f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER0 = "joiner0";
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String BUILDRELATION = "BuildRel";
+ private static final String PROBERELATION = "ProbeRel";
private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
private final int memsize;
private static final long serialVersionUID = 1L;
private final int inputsize0;
private final double factor;
+ private final int recordsPerFrame;
private final int[] keys0;
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final int recordsPerFrame;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
/**
* @param spec
@@ -88,19 +92,39 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), LARGERELATION);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), BUILDRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), PROBERELATION);
builder.addActivity(phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addActivity(phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -125,12 +149,18 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
- ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
- .createPartitioner();
+ private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+ private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] bufferForPartitions;
@@ -146,8 +176,8 @@
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor0.reset(buf);
- if (accessor0.getTupleCount() > 0) {
+ accessorBuild.reset(buf);
+ if (accessorBuild.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -163,17 +193,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc0.partition(accessor0, i, B);
+ entry = hpcBuild.partition(accessorBuild, i, B);
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -182,15 +212,16 @@
}
}
} else {
- entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftappender.append(accessor0, i)) {
+ if (!ftappender.append(accessorBuild, i)) {
build(inBuffer);
ftappender.reset(inBuffer, true);
- } else
+ } else {
break;
+ }
}
} else {
entry %= B;
@@ -198,7 +229,7 @@
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -253,7 +284,7 @@
int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
bufferForPartitions = new ByteBuffer[B];
fWriters = new RunFileWriter[B];
for (int i = 0; i < B; i++) {
@@ -291,11 +322,11 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String largeRelation;
+ private String relationName;
public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
super(id);
- this.largeRelation = relationName;
+ this.relationName = relationName;
}
@Override
@@ -307,22 +338,28 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
- private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
- private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories);
- ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+ private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
private final ByteBuffer inBuffer = ctx.allocateFrame();
private final ByteBuffer outBuffer = ctx.allocateFrame();
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private ByteBuffer[] bufferForPartitions;
private int B;
private int memoryForHashtable;
@@ -331,10 +368,10 @@
public void open() throws HyracksDataException {
joiner0 = (InMemoryHashJoin) env.get(JOINER0);
writer.open();
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+ buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
B = (Integer) env.get(NUM_PARTITION);
memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
- sWriters = new RunFileWriter[B];
+ probeWriters = new RunFileWriter[B];
bufferForPartitions = new ByteBuffer[B];
for (int i = 0; i < B; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
@@ -346,18 +383,18 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc1.partition(accessor1, i, B);
+ entry = hpcProbe.partition(accessorProbe, i, B);
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -366,10 +403,10 @@
}
}
} else {
- entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftap.append(accessor1, i)) {
+ if (!ftap.append(accessorProbe, i)) {
joiner0.join(inBuffer, writer);
ftap.reset(inBuffer, true);
} else
@@ -382,7 +419,7 @@
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -407,8 +444,8 @@
if (memoryForHashtable != memsize - 2) {
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor1.reset(buf);
- if (accessor1.getTupleCount() > 0) {
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -422,40 +459,43 @@
tableSize = (int) (memsize * recordsPerFrame * factor);
}
for (int partitionid = 0; partitionid < B; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
- hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+ hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+ nullWriters1);
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(inBuffer, copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(inBuffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer, writer);
inBuffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
}
writer.close();
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(PROBERELATION, null);
+ env.set(BUILDRELATION, null);
env.set(JOINER0, null);
env.set(MEM_HASHTABLE, null);
env.set(NUM_PARTITION, null);
@@ -467,19 +507,19 @@
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createManagedWorkspaceFile(largeRelation);
+ FileReference file = ctx.createManagedWorkspaceFile(relationName);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- sWriters[i] = writer;
+ probeWriters[i] = writer;
}
writer.nextFrame(head);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index a49c21a9..ebeceeb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -21,8 +22,10 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
public class InMemoryHashJoin {
private final Link[] table;
private final List<ByteBuffer> buffers;
- private final FrameTupleAccessor accessor0;
- private final ITuplePartitionComputer tpc0;
- private final FrameTupleAccessor accessor1;
- private final ITuplePartitionComputer tpc1;
+ private final FrameTupleAccessor accessorBuild;
+ private final ITuplePartitionComputer tpcBuild;
+ private final FrameTupleAccessor accessorProbe;
+ private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
-
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuild;
+
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
- FrameTuplePairComparator comparator) {
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
table = new Link[tableSize];
buffers = new ArrayList<ByteBuffer>();
- this.accessor0 = accessor0;
- this.tpc0 = tpc0;
- this.accessor1 = accessor1;
- this.tpc1 = tpc1;
+ this.accessorBuild = accessor1;
+ this.tpcBuild = tpc1;
+ this.accessorProbe = accessor0;
+ this.tpcProbe = tpc0;
appender = new FrameTupleAppender(ctx.getFrameSize());
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int fieldCountOuter = accessor1.getFieldCount();
+ nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+ DataOutput out = nullTupleBuild.getDataOutput();
+ for (int i = 0; i < fieldCountOuter; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuild.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuild = null;
+ }
}
public void build(ByteBuffer buffer) throws HyracksDataException {
buffers.add(buffer);
int bIndex = buffers.size() - 1;
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpc0.partition(accessor0, i, table.length);
+ int entry = tpcBuild.partition(accessorBuild, i, table.length);
long tPointer = (((long) bIndex) << 32) + i;
Link link = table[entry];
if (link == null) {
@@ -70,29 +88,41 @@
}
public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
- int entry = tpc1.partition(accessor1, i, table.length);
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
+ int entry = tpcProbe.partition(accessorProbe, i, table.length);
Link link = table[entry];
+ boolean matchFound = false;
if (link != null) {
for (int j = 0; j < link.size; ++j) {
long pointer = link.pointers[j];
int bIndex = (int) ((pointer >> 32) & 0xffffffff);
int tIndex = (int) (pointer & 0xffffffff);
- accessor0.reset(buffers.get(bIndex));
- int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+ accessorBuild.reset(buffers.get(bIndex));
+ int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ matchFound = true;
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
throw new IllegalStateException();
}
}
}
}
}
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+ .getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index b4797e8..ce8be1c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
private final int tableSize;
public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
- this.tableSize = tableSize;
recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ this.tableSize = tableSize;
+ }
+
+ public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ int tableSize) {
+ super(spec, 2, 1);
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ this.tableSize = tableSize;
}
@Override
@@ -66,10 +87,11 @@
HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
builder.addActivity(hba);
- builder.addSourceEdge(0, hba, 0);
+ builder.addSourceEdge(1, hba, 0);
builder.addActivity(hpa);
- builder.addSourceEdge(1, hpa, 0);
+ builder.addSourceEdge(0, hpa, 0);
+
builder.addTargetEdge(0, hpa, 0);
builder.addBlockingEdge(hba, hpa);
@@ -91,6 +113,13 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner;
@@ -102,7 +131,7 @@
.createPartitioner();
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..e92d787
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+ private final FrameTupleAccessor accessorInner;
+ private final FrameTupleAccessor accessorOuter;
+ private final FrameTupleAppender appender;
+ private final ITuplePairComparator tpComparator;
+ private final ByteBuffer outBuffer;
+ private final ByteBuffer innerBuffer;
+ private final List<ByteBuffer> outBuffers;
+ private final int memSize;
+ private final IHyracksTaskContext ctx;
+ private RunFileReader runFileReader;
+ private int currentMemSize = 0;
+ private final RunFileWriter runFileWriter;
+
+ public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+ ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+ this.accessorInner = accessor1;
+ this.accessorOuter = accessor0;
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.tpComparator = comparators;
+ this.outBuffer = ctx.allocateFrame();
+ this.innerBuffer = ctx.allocateFrame();
+ this.appender.reset(outBuffer, true);
+ this.outBuffers = new ArrayList<ByteBuffer>();
+ this.memSize = memSize;
+ this.ctx = ctx;
+
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ }
+
+ public void cache(ByteBuffer buffer) throws HyracksDataException {
+ runFileWriter.nextFrame(buffer);
+ System.out.println(runFileWriter.getFileSize());
+ }
+
+ public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+ if (outBuffers.size() < memSize - 3) {
+ createAndCopyFrame(outerBuffer);
+ return;
+ }
+ if (currentMemSize < memSize - 3) {
+ reloadFrame(outerBuffer);
+ return;
+ }
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ currentMemSize = 0;
+ reloadFrame(outerBuffer);
+ }
+
+ private void createAndCopyFrame(ByteBuffer outerBuffer) {
+ ByteBuffer outerBufferCopy = ctx.allocateFrame();
+ FrameUtils.copy(outerBuffer, outerBufferCopy);
+ outBuffers.add(outerBufferCopy);
+ currentMemSize++;
+ }
+
+ private void reloadFrame(ByteBuffer outerBuffer) {
+ outBuffers.get(currentMemSize).clear();
+ FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+ currentMemSize++;
+ }
+
+ private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+ throws HyracksDataException {
+ accessorOuter.reset(outerBuffer);
+ accessorInner.reset(innerBuffer);
+ int tupleCount0 = accessorOuter.getTupleCount();
+ int tupleCount1 = accessorInner.getTupleCount();
+
+ for (int i = 0; i < tupleCount0; ++i) {
+ for (int j = 0; j < tupleCount1; ++j) {
+ int c = compare(accessorOuter, i, accessorInner, j);
+ if (c == 0) {
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void closeCache() throws HyracksDataException {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+
+ public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ outBuffers.clear();
+ currentMemSize = 0;
+
+ if (appender.getTupleCount() > 0) {
+ flushFrame(outBuffer, writer);
+ }
+ }
+
+ private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
+
+ private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..5b5ace1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final String JOINER = "joiner";
+
+ private static final long serialVersionUID = 1L;
+ private final ITuplePairComparatorFactory comparatorFactory;
+ private final int memSize;
+
+ public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+ RecordDescriptor recordDescriptor, int memSize) {
+ super(spec, 2, 1);
+ this.comparatorFactory = comparatorFactory;
+ this.recordDescriptors[0] = recordDescriptor;
+ this.memSize = memSize;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), 0));
+ NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(), 1));
+
+ builder.addActivity(jc);
+ builder.addSourceEdge(1, jc, 0);
+
+ builder.addActivity(nlj);
+ builder.addSourceEdge(0, nlj, 0);
+
+ builder.addTargetEdge(0, nlj, 0);
+ builder.addBlockingEdge(jc, nlj);
+ }
+
+ private class JoinCacheActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public JoinCacheActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ copyBuffer.flip();
+ joiner.cache(copyBuffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeCache();
+ env.set(JOINER, joiner);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+ };
+ return op;
+ }
+ }
+
+ private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public NestedLoopJoinActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = (NestedLoopJoin) env.get(JOINER);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ joiner.join(buffer, writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeJoin(writer);
+ writer.close();
+ env.set(JOINER, null);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+ };
+ return op;
+ }
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..ff39cc8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+ super(spec, 1, outputArity);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ FrameUtils.flushFrame(bufferAccessor, writer);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..f5ecde6
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+ public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+ super(spec, nInputs, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ UnionActivityNode uba = new UnionActivityNode(new ActivityId(getOperatorId(), 0));
+ builder.addActivity(uba);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, uba, i);
+ }
+ builder.addTargetEdge(0, uba, 0);
+ }
+
+ private class UnionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public UnionActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ return new UnionOperator(ctx, inRecordDesc);
+ }
+ }
+
+ private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+ private int nOpened;
+
+ private int nClosed;
+
+ public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
+ nOpened = 0;
+ nClosed = 0;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nOpened == 1) {
+ writer.open();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.nextFrame(buffer);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nClosed == inputArity) {
+ writer.close();
+ }
+ }
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
new file mode 100644
index 0000000..2b32142
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitOperatorTest extends AbstractIntegrationTest {
+
+ public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+ BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+ BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+ String lineA, lineB;
+ while ((lineA = fileA.readLine()) != null) {
+ lineB = fileB.readLine();
+ Assert.assertEquals(lineA, lineB);
+ }
+ Assert.assertNull(fileB.readLine());
+ }
+
+ @Test
+ public void test() throws Exception {
+ final int outputArity = 2;
+
+ JobSpecification spec = new JobSpecification();
+
+ String inputFileName = "data/words.txt";
+ File[] outputFile = new File[outputArity];
+ for (int i = 0; i < outputArity; i++) {
+ outputFile[i] = File.createTempFile("splitop", null);
+ outputFile[i].deleteOnExit();
+ }
+
+ FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
+
+ String[] locations = new String[] { NC1_ID };
+
+ DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+ RecordDescriptor stringRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+ FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+ inputSplits), stringParser, stringRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
+
+ SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
+
+ IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+ for (int i = 0; i < outputArity; i++) {
+ outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(NC1_ID,
+ outputFile[i].getAbsolutePath()) });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
+ }
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ for (int i = 0; i < outputArity; i++) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+ }
+
+ for (int i = 0; i < outputArity; i++) {
+ spec.addRoot(outputOp[i]);
+ }
+ runTest(spec);
+
+ for (int i = 0; i < outputArity; i++) {
+ compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index c0d0211..75a4830 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.DataOutput;
import java.io.File;
+import java.io.IOException;
import org.junit.Test;
@@ -23,8 +25,11 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +56,29 @@
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
private static final boolean DEBUG = true;
+ static private class NoopNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+ private NoopNullWriterFactory() {
+ }
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ out.writeShort(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+ }
+
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@@ -274,6 +302,251 @@
}
@Test
+ public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories, 128);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
new file mode 100644
index 0000000..cab54f4
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
+ private static final boolean DEBUG = true;
+
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
+
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator() {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+ }
+ }
+
+ static class JoinComparator implements ITuplePairComparator {
+
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
+
+ /*
+ * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+ * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+ * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+ * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+ * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+ * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+ * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+ * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+ * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+ */
+ @Test
+ public void customerOrderCIDJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDJoinMulti() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDJoinAutoExpand() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+ @Test
+ public void union01() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+ new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+ FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+ UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+ spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
public int getTokenLength();
+ public int getNumTokens();
+
public void writeToken(DataOutput dos) throws IOException;
public RecordDescriptor getTokenSchema();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
private final char delimiter;
+ private final byte typeTag;
private byte[] data;
private int start;
private int length;
@@ -38,8 +39,14 @@
private int tokenStart;
private int pos;
+ public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
+
public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
@@ -88,6 +95,9 @@
@Override
public void writeToken(DataOutput dos) throws IOException {
+ if (typeTag > 0)
+ dos.write(typeTag);
+
// WARNING: 2-byte length indicator is specific to UTF-8
dos.writeShort((short) tokenLength);
dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
+
+ // cannot be implemented for this tokenizer
+ @Override
+ public int getNumTokens() {
+ return -1;
+ }
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
private static final long serialVersionUID = 1L;
private final char delimiter;
+ private final byte typeTag;
+
+ public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
public IBinaryTokenizer createBinaryTokenizer() {
- return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+ return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
}
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
-}
+
+ @Override
+ public int getNumTokens() {
+ return 0;
+ }
+}
\ No newline at end of file