Merged hyracks_aqua_changes@405:431 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@432 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index a431939..0987bd2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -16,6 +16,7 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -25,4 +26,6 @@
public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+
+ public ICCContext getCCContext();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..266ebe9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ICCContext {
+ public Map<String, Set<String>> getIPAddressNodeMap();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 362306a..3dcf620 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -93,7 +94,8 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx);
/**
* Translate this connector descriptor to JSON.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index e58ac99..9e8dbcb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -73,7 +74,8 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx);
/**
* Translates this operator descriptor to JSON.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+ public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+ public INullWriter createNullWriter();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+ public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+ throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..26cb525
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+
+ public ITuplePairComparator createTuplePairComparator();
+}
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a4aa482..e5efe92 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -23,6 +23,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -33,9 +34,11 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.control.CCConfig;
import edu.uci.ics.hyracks.api.control.IClusterController;
import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
import edu.uci.ics.hyracks.api.control.NodeParameters;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -76,6 +79,8 @@
private final Map<String, NodeControllerState> nodeRegistry;
+ private final Map<String, Set<String>> ipAddressNodeNameMap;
+
private final Map<String, CCApplicationContext> applications;
private final ServerContext serverCtx;
@@ -94,9 +99,12 @@
private final Timer timer;
+ private final ICCContext ccContext;
+
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ ipAddressNodeNameMap = new HashMap<String, Set<String>>();
applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
ClusterControllerService.class.getName()));
@@ -106,6 +114,12 @@
jobQueue = new JobQueue();
scheduler = new NaiveScheduler(this);
this.timer = new Timer(true);
+ ccContext = new ICCContext() {
+ @Override
+ public Map<String, Set<String>> getIPAddressNodeMap() {
+ return ipAddressNodeNameMap;
+ }
+ };
}
@Override
@@ -153,6 +167,10 @@
return nodeRegistry;
}
+ public Map<String, Set<String>> getIPAddressNodeNameMap() {
+ return ipAddressNodeNameMap;
+ }
+
public CCConfig getConfig() {
return ccConfig;
}
@@ -169,7 +187,8 @@
@Override
public NodeParameters registerNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
- NodeControllerState state = new NodeControllerState(nodeController);
+ NCConfig ncConfig = nodeController.getConfiguration();
+ NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -242,7 +261,7 @@
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+ CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
applications.put(appName, appCtx);
}
}
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e69281b..40eb9ae 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -5,16 +5,20 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
public class NodeControllerState {
private final INodeController nodeController;
+ private final NCConfig ncConfig;
+
private final Set<UUID> activeJobIds;
private int lastHeartbeatDuration;
- public NodeControllerState(INodeController nodeController) {
+ public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
this.nodeController = nodeController;
+ this.ncConfig = ncConfig;
activeJobIds = new HashSet<UUID>();
}
@@ -34,6 +38,10 @@
return nodeController;
}
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
+
public Set<UUID> getActiveJobIds() {
return activeJobIds;
}
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..96e29d3 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +18,15 @@
import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+ private final ICCContext ccContext;
+
private IJobSpecificationFactory jobSpecFactory;
private List<IJobLifecycleListener> jobLifecycleListeners;
- public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
super(serverCtx, appName);
+ this.ccContext = ccContext;
jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
}
@@ -33,6 +37,10 @@
bootstrap.start();
}
+ public ICCContext getCCContext() {
+ return ccContext;
+ }
+
@Override
public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 3b03d25..d7f244e 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
throw new HyracksException("No application with id " + appName + " found");
}
JobSpecification spec = appCtx.createJobSpecification(jobId, jobSpec);
- JobRun run = plan(jobId, spec, jobFlags);
+ JobRun run = plan(jobId, spec, appCtx, jobFlags);
run.setStatus(JobStatus.INITIALIZED);
ccs.getRunMap().put(jobId, run);
@@ -71,7 +71,8 @@
return jobId;
}
- private JobRun plan(UUID jobId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ private JobRun plan(UUID jobId, JobSpecification jobSpec, final CCApplicationContext appCtx,
+ EnumSet<JobFlag> jobFlags) throws Exception {
final JobPlanBuilder builder = new JobPlanBuilder();
builder.init(appName, jobId, jobSpec, jobFlags);
PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@@ -92,13 +93,13 @@
PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, plan);
+ op.contributeSchedulingConstraints(acceptor, plan, appCtx);
}
});
PlanUtils.visit(jobSpec, new IConnectorDescriptorVisitor() {
@Override
public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, plan);
+ conn.contributeSchedulingConstraints(acceptor, plan, appCtx);
}
});
contributedConstraints.addAll(jobSpec.getUserConstraints());
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..3df302f 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
throw new Exception("Node with this name already registered.");
}
nodeMap.put(nodeId, state);
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+ if (nodes == null) {
+ nodes = new HashSet<String>();
+ ipAddressNodeNameMap.put(ipAddress, nodes);
+ }
+ nodes.add(nodeId);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 38fb7ae..dca0dee 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,6 +44,7 @@
LOGGER.info(e.getKey() + " considered dead");
}
}
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
for (final UUID jid : state.getActiveJobIds()) {
@@ -52,6 +53,13 @@
LOGGER.info("Aborting: " + jid);
ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid, lastAttempt));
}
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+ if (ipNodes != null) {
+ if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+ ipAddressNodeNameMap.remove(ipAddress);
+ }
+ }
}
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
this.appCtx = appCtx;
this.jobId = jobId;
this.attempt = attempt;
- stageletMap = new HashMap<UUID, Stagelet>();
- envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
- counterMap = new HashMap<String, Counter>();
+ stageletMap = new Hashtable<UUID, Stagelet>();
+ envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
}
@@ -78,8 +79,10 @@
}
public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
- if (!envMap.containsKey(hod.getOperatorId())) {
- envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ synchronized (envMap) {
+ if (!envMap.containsKey(hod.getOperatorId())) {
+ envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ }
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+ public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
@@ -132,15 +135,19 @@
return nodeController;
}
- public synchronized void dumpProfile(JobletProfile jProfile) {
- Map<String, Long> counters = jProfile.getCounters();
- for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
- counters.put(e.getKey(), e.getValue().get());
+ public void dumpProfile(JobletProfile jProfile) {
+ synchronized (counterMap) {
+ Map<String, Long> counters = jProfile.getCounters();
+ for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+ counters.put(e.getKey(), e.getValue().get());
+ }
}
- for (Stagelet si : stageletMap.values()) {
- StageletProfile sProfile = new StageletProfile(si.getStageId());
- si.dumpProfile(sProfile);
- jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ synchronized (stageletMap) {
+ for (Stagelet si : stageletMap.values()) {
+ StageletProfile sProfile = new StageletProfile(si.getStageId());
+ si.dumpProfile(sProfile);
+ jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ }
}
}
@@ -193,12 +200,14 @@
}
@Override
- public synchronized ICounter getCounter(String name, boolean create) {
- Counter counter = counterMap.get(name);
- if (counter == null && create) {
- counter = new Counter(name);
- counterMap.put(name, counter);
+ public ICounter getCounter(String name, boolean create) {
+ synchronized (counterMap) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
}
- return counter;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..293e626 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
}
nodeCapability = computeNodeCapability();
connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
- jobletMap = new HashMap<UUID, Joblet>();
+ jobletMap = new Hashtable<UUID, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
@@ -251,13 +251,11 @@
for (int i : tasks.get(hanId)) {
IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
opNumPartitions.get(op.getOperatorId()));
- OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+ OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+ executor);
stagelet.setOperator(op.getOperatorId(), i, or);
if (inputs != null) {
for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
IConnectorDescriptor conn = inputs.get(j);
OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
.getOperatorId();
@@ -276,7 +274,7 @@
portMap.put(piId, endpoint);
IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
- or.setFrameReader(reader);
+ or.setFrameReader(j, reader);
}
}
honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
@@ -438,19 +436,20 @@
si.setEndpointList(null);
}
- private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+ private Joblet getLocalJoblet(UUID jobId) throws Exception {
Joblet ji = jobletMap.get(jobId);
return ji;
}
- private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
- throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji == null || ji.getAttempt() != attempt) {
- ji = new Joblet(this, jobId, attempt, appCtx);
- jobletMap.put(jobId, ji);
+ private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+ synchronized (jobletMap) {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null || ji.getAttempt() != attempt) {
+ ji = new Joblet(this, jobId, attempt, appCtx);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
}
- return ji;
}
public Executor getExecutor() {
@@ -458,7 +457,7 @@
}
@Override
- public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(UUID jobId) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
@@ -563,7 +562,7 @@
}
@Override
- public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+ public void abortJoblet(UUID jobId, int attempt) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..68b2cad 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
});
}
- protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
- pendingOperators.remove(opIId);
- if (pendingOperators.isEmpty()) {
+ protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+ boolean done = false;
+ synchronized (pendingOperators) {
+ pendingOperators.remove(opIId);
+ done = pendingOperators.isEmpty();
+ }
+ if (done) {
try {
StageletProfile sProfile = new StageletProfile(stageId);
dumpProfile(sProfile);
@@ -227,7 +231,7 @@
}
@Override
- public ICounter getCounter(String name, boolean create) {
+ public synchronized ICounter getCounter(String name, boolean create) {
Counter counter = counterMap.get(name);
if (counter == null && create) {
counter = new Counter(name);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class OperatorRunnable implements Runnable {
+ private final IHyracksStageletContext ctx;
private final IOperatorNodePushable opNode;
- private IFrameReader reader;
- private ByteBuffer buffer;
+ private final int nInputs;
+ private final Executor executor;
+ private IFrameReader[] readers;
private volatile boolean abort;
- public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+ public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+ this.ctx = ctx;
this.opNode = opNode;
- buffer = ctx.allocateFrame();
+ this.nInputs = nInputs;
+ this.executor = executor;
+ readers = new IFrameReader[nInputs];
}
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
opNode.setOutputFrameWriter(index, writer, recordDesc);
}
- public void setFrameReader(IFrameReader reader) {
- this.reader = reader;
+ public void setFrameReader(int inputIdx, IFrameReader reader) {
+ this.readers[inputIdx] = reader;
}
public void abort() {
@@ -50,20 +57,28 @@
public void run() {
try {
opNode.initialize();
- if (reader != null) {
- IFrameWriter writer = opNode.getInputFrameWriter(0);
- writer.open();
- reader.open();
- while (readFrame()) {
- if (abort) {
- break;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ if (nInputs > 0) {
+ final Semaphore sem = new Semaphore(nInputs - 1);
+ for (int i = 1; i < nInputs; ++i) {
+ final IFrameReader reader = readers[i];
+ final IFrameWriter writer = opNode.getInputFrameWriter(i);
+ sem.acquire();
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ pushFrames(reader, writer);
+ } catch (HyracksDataException e) {
+ } finally {
+ sem.release();
+ }
+ }
+ });
}
- reader.close();
- writer.close();
+ try {
+ pushFrames(readers[0], opNode.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(nInputs - 1);
+ }
}
opNode.deinitialize();
} catch (Exception e) {
@@ -71,8 +86,20 @@
}
}
- protected boolean readFrame() throws HyracksDataException {
- return reader.nextFrame(buffer);
+ private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ writer.open();
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ reader.close();
+ writer.close();
}
@Override
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
return false;
}
- public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
- int length) {
+ public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+ int dataLen1) {
int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
int length0 = endOffset0 - startOffset0;
- if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ int slotsLen1 = fieldSlots1.length * 4;
+ int length1 = slotsLen1 + dataLen1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
ByteBuffer src0 = accessor0.getBuffer();
int slotsLen0 = accessor0.getFieldSlotsLength();
int dataLen0 = length0 - slotsLen0;
// Copy slots from accessor0 verbatim
System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
- // Copy slots from fieldSlots with the following transformation:
- // newSlotIdx = oldSlotIdx + dataLen0
- for (int i = 0; i < fieldSlots.length; ++i) {
- buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+ // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < fieldSlots1.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
}
// Copy data0
System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
- + fieldSlots.length * 4, dataLen0);
- // Copy bytes
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
- + dataLen0, length);
- tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+ + slotsLen1, dataLen0);
+ // Copy bytes1
+ System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+ + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
return false;
}
+ public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+ int tIndex1) {
+ int slotsLen0 = fieldSlots0.length * 4;
+ int length0 = slotsLen0 + dataLen0;
+
+ int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+ int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+ int length1 = endOffset1 - startOffset1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ ByteBuffer src1 = accessor1.getBuffer();
+ int slotsLen1 = accessor1.getFieldSlotsLength();
+ int dataLen1 = length1 - slotsLen1;
+ // Copy fieldSlots0 verbatim
+ for (int i = 0; i < fieldSlots0.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+ }
+ // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < slotsLen1 / 4; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+ }
+ // Copy bytes0
+ System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1,
+ dataLen0);
+ // Copy data1
+ System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+ + slotsLen1 + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
int fTargetSlotsLength = fields.length * 4;
int length = fTargetSlotsLength;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 0eebd93..e4613dd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 7e97975..4453412 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a038a40..6ee9ab37 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.connectors;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -52,7 +53,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
JobSpecification jobSpec = plan.getJobSpecification();
IOperatorDescriptor consumer = jobSpec.getConsumer(this);
IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String RELATION0 = "Rel0";
+ private static final String RELATION1 = "Rel1";
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -56,6 +58,8 @@
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+ HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+ HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
JoinActivityNode join = new JoinActivityNode();
- builder.addTask(rpart);
- builder.addSourceEdge(0, rpart, 0);
+ builder.addTask(part0);
+ builder.addSourceEdge(0, part0, 0);
- builder.addTask(spart);
- builder.addSourceEdge(1, spart, 0);
+ builder.addTask(part1);
+ builder.addSourceEdge(1, part1, 0);
builder.addTask(join);
- builder.addBlockingEdge(rpart, spart);
- builder.addBlockingEdge(spart, join);
+ builder.addBlockingEdge(part0, part1);
+ builder.addBlockingEdge(part1, join);
builder.addTargetEdge(0, join, 0);
}
@@ -217,18 +241,24 @@
}
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void initialize() throws HyracksDataException {
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
- sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+ buildWriters = (RunFileWriter[]) env.get(RELATION1);
+ probeWriters = (RunFileWriter[]) env.get(RELATION0);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators));
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
// build
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
- buffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(buffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
writer.close();
@@ -276,8 +308,8 @@
@Override
public void deinitialize() throws HyracksDataException {
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(RELATION1, null);
+ env.set(RELATION0, null);
}
};
return op;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER0 = "joiner0";
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String BUILDRELATION = "BuildRel";
+ private static final String PROBERELATION = "ProbeRel";
private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
private final int memsize;
private static final long serialVersionUID = 1L;
private final int inputsize0;
private final double factor;
+ private final int recordsPerFrame;
private final int[] keys0;
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final int recordsPerFrame;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
/**
* @param spec
@@ -88,19 +92,39 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
builder.addTask(phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addTask(phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -127,12 +151,18 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
- ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
- .createPartitioner();
+ private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+ private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor0.reset(buf);
- if (accessor0.getTupleCount() > 0) {
+ accessorBuild.reset(buf);
+ if (accessorBuild.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -165,17 +195,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc0.partition(accessor0, i, B);
+ entry = hpcBuild.partition(accessorBuild, i, B);
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -184,15 +214,16 @@
}
}
} else {
- entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftappender.append(accessor0, i)) {
+ if (!ftappender.append(accessorBuild, i)) {
build(inBuffer);
ftappender.reset(inBuffer, true);
- } else
+ } else {
break;
+ }
}
} else {
entry %= B;
@@ -200,7 +231,7 @@
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -255,7 +286,7 @@
int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
bufferForPartitions = new ByteBuffer[B];
fWriters = new RunFileWriter[B];
for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String largeRelation;
+ private String relationName;
public PartitionAndJoinActivityNode(String relationName) {
super();
- this.largeRelation = relationName;
+ this.relationName = relationName;
}
@Override
@@ -315,22 +346,28 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
- private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
- private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories);
- ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+ private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
private final ByteBuffer inBuffer = ctx.allocateFrame();
private final ByteBuffer outBuffer = ctx.allocateFrame();
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private ByteBuffer[] bufferForPartitions;
private int B;
private int memoryForHashtable;
@@ -339,10 +376,10 @@
public void open() throws HyracksDataException {
joiner0 = (InMemoryHashJoin) env.get(JOINER0);
writer.open();
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+ buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
B = (Integer) env.get(NUM_PARTITION);
memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
- sWriters = new RunFileWriter[B];
+ probeWriters = new RunFileWriter[B];
bufferForPartitions = new ByteBuffer[B];
for (int i = 0; i < B; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc1.partition(accessor1, i, B);
+ entry = hpcProbe.partition(accessorProbe, i, B);
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -374,10 +411,10 @@
}
}
} else {
- entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftap.append(accessor1, i)) {
+ if (!ftap.append(accessorProbe, i)) {
joiner0.join(inBuffer, writer);
ftap.reset(inBuffer, true);
} else
@@ -390,7 +427,7 @@
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -415,8 +452,8 @@
if (memoryForHashtable != memsize - 2) {
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor1.reset(buf);
- if (accessor1.getTupleCount() > 0) {
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -430,40 +467,43 @@
tableSize = (int) (memsize * recordsPerFrame * factor);
}
for (int partitionid = 0; partitionid < B; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
- hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+ hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+ nullWriters1);
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(inBuffer, copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(inBuffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer, writer);
inBuffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
}
writer.close();
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(PROBERELATION, null);
+ env.set(BUILDRELATION, null);
env.set(JOINER0, null);
env.set(MEM_HASHTABLE, null);
env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createWorkspaceFile(largeRelation);
+ FileReference file = ctx.createWorkspaceFile(relationName);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- sWriters[i] = writer;
+ probeWriters[i] = writer;
}
writer.nextFrame(head);
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -21,8 +22,10 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
public class InMemoryHashJoin {
private final Link[] table;
private final List<ByteBuffer> buffers;
- private final FrameTupleAccessor accessor0;
- private final ITuplePartitionComputer tpc0;
- private final FrameTupleAccessor accessor1;
- private final ITuplePartitionComputer tpc1;
+ private final FrameTupleAccessor accessorBuild;
+ private final ITuplePartitionComputer tpcBuild;
+ private final FrameTupleAccessor accessorProbe;
+ private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
-
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuild;
+
public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
- FrameTuplePairComparator comparator) {
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
table = new Link[tableSize];
buffers = new ArrayList<ByteBuffer>();
- this.accessor0 = accessor0;
- this.tpc0 = tpc0;
- this.accessor1 = accessor1;
- this.tpc1 = tpc1;
+ this.accessorBuild = accessor1;
+ this.tpcBuild = tpc1;
+ this.accessorProbe = accessor0;
+ this.tpcProbe = tpc0;
appender = new FrameTupleAppender(ctx.getFrameSize());
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int fieldCountOuter = accessor1.getFieldCount();
+ nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+ DataOutput out = nullTupleBuild.getDataOutput();
+ for (int i = 0; i < fieldCountOuter; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuild.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuild = null;
+ }
}
public void build(ByteBuffer buffer) throws HyracksDataException {
buffers.add(buffer);
int bIndex = buffers.size() - 1;
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpc0.partition(accessor0, i, table.length);
+ int entry = tpcBuild.partition(accessorBuild, i, table.length);
long tPointer = (((long) bIndex) << 32) + i;
Link link = table[entry];
if (link == null) {
@@ -70,29 +88,41 @@
}
public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
- int entry = tpc1.partition(accessor1, i, table.length);
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
+ int entry = tpcProbe.partition(accessorProbe, i, table.length);
Link link = table[entry];
+ boolean matchFound = false;
if (link != null) {
for (int j = 0; j < link.size; ++j) {
long pointer = link.pointers[j];
int bIndex = (int) ((pointer >> 32) & 0xffffffff);
int tIndex = (int) (pointer & 0xffffffff);
- accessor0.reset(buffers.get(bIndex));
- int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+ accessorBuild.reset(buffers.get(bIndex));
+ int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ matchFound = true;
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
throw new IllegalStateException();
}
}
}
}
}
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+ .getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
private final int tableSize;
public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
- this.tableSize = tableSize;
recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ this.tableSize = tableSize;
+ }
+
+ public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ int tableSize) {
+ super(spec, 2, 1);
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ this.tableSize = tableSize;
}
@Override
@@ -66,10 +87,11 @@
HashProbeActivityNode hpa = new HashProbeActivityNode();
builder.addTask(hba);
- builder.addSourceEdge(0, hba, 0);
+ builder.addSourceEdge(1, hba, 0);
builder.addTask(hpa);
- builder.addSourceEdge(1, hpa, 0);
+ builder.addSourceEdge(0, hpa, 0);
+
builder.addTargetEdge(0, hpa, 0);
builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner;
@@ -99,7 +128,7 @@
.createPartitioner();
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..5ee8fd5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+ private final FrameTupleAccessor accessorInner;
+ private final FrameTupleAccessor accessorOuter;
+ private final FrameTupleAppender appender;
+ private final ITuplePairComparator tpComparator;
+ private final ByteBuffer outBuffer;
+ private final ByteBuffer innerBuffer;
+ private final List<ByteBuffer> outBuffers;
+ private final int memSize;
+ private final IHyracksStageletContext ctx;
+ private RunFileReader runFileReader;
+ private int currentMemSize = 0;
+ private final RunFileWriter runFileWriter;
+
+ public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+ ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+ this.accessorInner = accessor1;
+ this.accessorOuter = accessor0;
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.tpComparator = comparators;
+ this.outBuffer = ctx.allocateFrame();
+ this.innerBuffer = ctx.allocateFrame();
+ this.appender.reset(outBuffer, true);
+ this.outBuffers = new ArrayList<ByteBuffer>();
+ this.memSize = memSize;
+ this.ctx = ctx;
+
+ FileReference file = ctx.getJobletContext().createWorkspaceFile(
+ this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ }
+
+ public void cache(ByteBuffer buffer) throws HyracksDataException {
+ runFileWriter.nextFrame(buffer);
+ System.out.println(runFileWriter.getFileSize());
+ }
+
+ public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+ if (outBuffers.size() < memSize - 3) {
+ createAndCopyFrame(outerBuffer);
+ return;
+ }
+ if (currentMemSize < memSize - 3) {
+ reloadFrame(outerBuffer);
+ return;
+ }
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ currentMemSize = 0;
+ reloadFrame(outerBuffer);
+ }
+
+ private void createAndCopyFrame(ByteBuffer outerBuffer) {
+ ByteBuffer outerBufferCopy = ctx.allocateFrame();
+ FrameUtils.copy(outerBuffer, outerBufferCopy);
+ outBuffers.add(outerBufferCopy);
+ currentMemSize++;
+ }
+
+ private void reloadFrame(ByteBuffer outerBuffer) {
+ outBuffers.get(currentMemSize).clear();
+ FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+ currentMemSize++;
+ }
+
+ private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+ throws HyracksDataException {
+ accessorOuter.reset(outerBuffer);
+ accessorInner.reset(innerBuffer);
+ int tupleCount0 = accessorOuter.getTupleCount();
+ int tupleCount1 = accessorInner.getTupleCount();
+
+ for (int i = 0; i < tupleCount0; ++i) {
+ for (int j = 0; j < tupleCount1; ++j) {
+ int c = compare(accessorOuter, i, accessorInner, j);
+ if (c == 0) {
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void closeCache() throws HyracksDataException {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+
+ public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ outBuffers.clear();
+ currentMemSize = 0;
+
+ if (appender.getTupleCount() > 0) {
+ flushFrame(outBuffer, writer);
+ }
+ }
+
+ private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
+
+ private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..1436e30
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final String JOINER = "joiner";
+
+ private static final long serialVersionUID = 1L;
+ private final ITuplePairComparatorFactory comparatorFactory;
+ private final int memSize;
+
+ public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+ RecordDescriptor recordDescriptor, int memSize) {
+ super(spec, 2, 1);
+ this.comparatorFactory = comparatorFactory;
+ this.recordDescriptors[0] = recordDescriptor;
+ this.memSize = memSize;
+ }
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ JoinCacheActivityNode jc = new JoinCacheActivityNode();
+ NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode();
+
+ builder.addTask(jc);
+ builder.addSourceEdge(1, jc, 0);
+
+ builder.addTask(nlj);
+ builder.addSourceEdge(0, nlj, 0);
+
+ builder.addTargetEdge(0, nlj, 0);
+ builder.addBlockingEdge(jc, nlj);
+ }
+
+ private class JoinCacheActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ copyBuffer.flip();
+ joiner.cache(copyBuffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeCache();
+ env.set(JOINER, joiner);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return NestedLoopJoinOperatorDescriptor.this;
+ }
+ }
+
+ private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = (NestedLoopJoin) env.get(JOINER);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ joiner.join(buffer, writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeJoin(writer);
+ writer.close();
+ env.set(JOINER, null);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return NestedLoopJoinOperatorDescriptor.this;
+ }
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+ super(spec, 1, outputArity);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ FrameUtils.flushFrame(bufferAccessor, writer);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+ public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+ super(spec, nInputs, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ UnionActivityNode uba = new UnionActivityNode();
+ builder.addTask(uba);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, uba, i);
+ }
+ builder.addTargetEdge(0, uba, 0);
+ }
+
+ private class UnionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public UnionActivityNode() {
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ return new UnionOperator(ctx, inRecordDesc);
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return UnionAllOperatorDescriptor.this;
+ }
+
+ }
+
+ private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+ private int nOpened;
+
+ private int nClosed;
+
+ public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+ nOpened = 0;
+ nClosed = 0;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nOpened == 1) {
+ writer.open();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.nextFrame(buffer);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nClosed == inputArity) {
+ writer.close();
+ }
+ }
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
new file mode 100644
index 0000000..2b32142
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitOperatorTest extends AbstractIntegrationTest {
+
+ public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+ BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+ BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+ String lineA, lineB;
+ while ((lineA = fileA.readLine()) != null) {
+ lineB = fileB.readLine();
+ Assert.assertEquals(lineA, lineB);
+ }
+ Assert.assertNull(fileB.readLine());
+ }
+
+ @Test
+ public void test() throws Exception {
+ final int outputArity = 2;
+
+ JobSpecification spec = new JobSpecification();
+
+ String inputFileName = "data/words.txt";
+ File[] outputFile = new File[outputArity];
+ for (int i = 0; i < outputArity; i++) {
+ outputFile[i] = File.createTempFile("splitop", null);
+ outputFile[i].deleteOnExit();
+ }
+
+ FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
+
+ String[] locations = new String[] { NC1_ID };
+
+ DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+ RecordDescriptor stringRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+ FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+ inputSplits), stringParser, stringRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
+
+ SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
+
+ IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+ for (int i = 0; i < outputArity; i++) {
+ outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(NC1_ID,
+ outputFile[i].getAbsolutePath()) });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
+ }
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ for (int i = 0; i < outputArity; i++) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+ }
+
+ for (int i = 0; i < outputArity; i++) {
+ spec.addRoot(outputOp[i]);
+ }
+ runTest(spec);
+
+ for (int i = 0; i < outputArity; i++) {
+ compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 4958a35..db5985d 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.DataOutput;
import java.io.File;
+import java.io.IOException;
import org.junit.Test;
@@ -23,8 +25,11 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +56,29 @@
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
private static final boolean DEBUG = true;
+ static private class NoopNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+ private NoopNullWriterFactory() {
+ }
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ out.writeShort(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+ }
+
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@@ -274,6 +302,251 @@
}
@Test
+ public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories, 128);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
new file mode 100644
index 0000000..cab54f4
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
+ private static final boolean DEBUG = true;
+
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
+
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator() {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+ }
+ }
+
+ static class JoinComparator implements ITuplePairComparator {
+
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
+
+ /*
+ * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+ * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+ * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+ * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+ * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+ * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+ * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+ * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+ * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+ */
+ @Test
+ public void customerOrderCIDJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDJoinMulti() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDJoinAutoExpand() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+ @Test
+ public void union01() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+ new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+ FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+ UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+ spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
public int getTokenLength();
+ public int getNumTokens();
+
public void writeToken(DataOutput dos) throws IOException;
public RecordDescriptor getTokenSchema();
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
private final char delimiter;
+ private final byte typeTag;
private byte[] data;
private int start;
private int length;
@@ -38,8 +39,14 @@
private int tokenStart;
private int pos;
+ public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
+
public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
@@ -88,6 +95,9 @@
@Override
public void writeToken(DataOutput dos) throws IOException {
+ if (typeTag > 0)
+ dos.write(typeTag);
+
// WARNING: 2-byte length indicator is specific to UTF-8
dos.writeShort((short) tokenLength);
dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
+
+ // cannot be implemented for this tokenizer
+ @Override
+ public int getNumTokens() {
+ return -1;
+ }
}
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
private static final long serialVersionUID = 1L;
private final char delimiter;
+ private final byte typeTag;
+
+ public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
public IBinaryTokenizer createBinaryTokenizer() {
- return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+ return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
}
}
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
-}
+
+ @Override
+ public int getNumTokens() {
+ return 0;
+ }
+}
\ No newline at end of file