Merged in GraceHashJoin. Added tests for GHJ
git-svn-id: https://hyracks.googlecode.com/svn/trunk@27 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 5b57ee3..6a3a447 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -30,8 +30,8 @@
public boolean supportsPullInterface();
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition);
+ int partition, int nPartitions);
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition);
+ int partition, int nPartitions);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index af60240..367a356 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -90,9 +90,7 @@
jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
jobManager = new JOLJobManagerImpl(this, jolRuntime);
taskExecutor = Executors.newCachedThreadPool();
- webServer = new WebServer(new Handler[] {
- getAdminConsoleHandler(), getApplicationDataHandler()
- });
+ webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
this.timer = new Timer(true);
}
@@ -176,7 +174,7 @@
@Override
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception {
+ StageletStatistics statistics) throws Exception {
jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
}
@@ -205,7 +203,7 @@
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
+ HttpServletResponse response) throws IOException, ServletException {
if (!"/".equals(target)) {
return;
}
@@ -242,7 +240,7 @@
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
+ HttpServletResponse response) throws IOException, ServletException {
}
});
return handler;
@@ -355,7 +353,7 @@
private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
this.nodeId = nodeId;
this.jobId = jobId;
this.plan = plan;
@@ -391,8 +389,8 @@
private Map<PortInstanceId, Endpoint> globalPortMap;
public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
- Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
- Map<PortInstanceId, Endpoint> globalPortMap) {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ Map<PortInstanceId, Endpoint> globalPortMap) {
this.nodeId = nodeId;
this.jobId = jobId;
this.plan = plan;
@@ -530,7 +528,7 @@
}
static class PortMapMergingAccumulator implements
- Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+ Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@Override
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 9d7862a..e8b6f14 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -212,7 +212,7 @@
}
}
ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
- .size()];
+ .size()];
int i = 0;
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
@@ -229,18 +229,18 @@
aParts.add((Integer) lData[1]);
}
p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
- plan, stageId, attempt, tasks, opPartitions);
+ plan, stageId, attempt, tasks, opPartitions);
}
LOGGER.info("Stage start - Phase 1");
Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
- new ClusterControllerService.PortMapMergingAccumulator());
+ new ClusterControllerService.PortMapMergingAccumulator());
ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
- .size()];
+ .size()];
ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
- .size()];
+ .size()];
ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
- .size()];
+ .size()];
i = 0;
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
@@ -257,11 +257,11 @@
aParts.add((Integer) lData[1]);
}
p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
- plan, stageId, tasks, opPartitions, globalPortMap);
+ plan, stageId, tasks, opPartitions, globalPortMap);
p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
- stageId);
+ stageId);
ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
- stageId);
+ stageId);
++i;
}
LOGGER.info("Stage start - Phase 2");
@@ -296,7 +296,7 @@
UUID jobId = (UUID) data[0];
Set<String> ts = (Set<String>) data[1];
ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
- .size()];
+ .size()];
int i = 0;
for (String n : ts) {
jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
@@ -305,7 +305,7 @@
ccs.runRemote(jcns, null);
} finally {
BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
- .createTuple(jobId));
+ .createTuple(jobId));
jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
jolRuntime.evaluate();
}
@@ -337,22 +337,22 @@
Integer attempt = (Integer) data[2];
Set<List> ts = (Set<List>) data[4];
ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
- .size()];
+ .size()];
int i = 0;
BasicTupleSet notificationTuples = new BasicTupleSet();
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
String nodeId = (String) t2Data[0];
jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
- attempt);
+ attempt);
notificationTuples.add(AbortNotifyTable
- .createTuple(jobId, stageId, nodeId, attempt));
+ .createTuple(jobId, stageId, nodeId, attempt));
}
try {
ccs.runRemote(jas, null);
} finally {
jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
- null);
+ null);
jolRuntime.evaluate();
}
} catch (Exception e) {
@@ -387,14 +387,14 @@
@Override
public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
- taskOutputIndex));
+ taskOutputIndex));
builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
}
@Override
public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
- taskInputIndex));
+ taskInputIndex));
builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
}
@@ -437,7 +437,7 @@
}
private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
- BasicTupleSet ocTuples) {
+ BasicTupleSet ocTuples) {
PartitionConstraint pc = od.getPartitionConstraint();
switch (pc.getPartitionConstraintType()) {
@@ -457,7 +457,7 @@
}
private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
- LocationConstraint locationConstraint, int benefit) {
+ LocationConstraint locationConstraint, int benefit) {
switch (locationConstraint.getConstraintType()) {
case ABSOLUTE:
String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
@@ -527,8 +527,8 @@
}
@Override
- public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception {
+ public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
BasicTupleSet scTuples = new BasicTupleSet();
scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
@@ -538,7 +538,8 @@
}
@Override
- public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+ public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+ throws Exception {
BasicTupleSet sfTuples = new BasicTupleSet();
sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
@@ -598,9 +599,8 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
+ JobPlan.class, Set.class };
public JobTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -645,9 +645,8 @@
private static Key PRIMARY_KEY = new Key(0, 1);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ IOperatorDescriptor.class };
public OperatorDescriptorTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -667,9 +666,8 @@
private static Key PRIMARY_KEY = new Key();
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, String.class,
+ Integer.class, Integer.class };
public OperatorLocationTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -689,9 +687,7 @@
private static Key PRIMARY_KEY = new Key();
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class };
public OperatorCloneCountTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -711,15 +707,9 @@
private static Key PRIMARY_KEY = new Key(0, 1);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class,
- ConnectorDescriptorId.class,
- OperatorDescriptorId.class,
- Integer.class,
- OperatorDescriptorId.class,
- Integer.class,
- IConnectorDescriptor.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, ConnectorDescriptorId.class,
+ OperatorDescriptorId.class, Integer.class, OperatorDescriptorId.class, Integer.class,
+ IConnectorDescriptor.class };
public ConnectorDescriptorTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -730,8 +720,8 @@
int srcPort = jobSpec.getProducerOutputIndex(conn);
IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
int destPort = jobSpec.getConsumerInputIndex(conn);
- Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
- .getOperatorId(), destPort, conn);
+ Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort,
+ destOD.getOperatorId(), destPort, conn);
return cdTuple;
}
}
@@ -745,9 +735,8 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+ ActivityNodeId.class, IActivityNode.class };
public ActivityNodeTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -755,7 +744,7 @@
static Tuple createTuple(UUID jobId, IActivityNode aNode) {
return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
- aNode);
+ aNode);
}
}
@@ -768,17 +757,16 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ Direction.class, ActivityNodeId.class, Integer.class };
public ActivityConnectionTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
- return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
- .getActivityNodeId(), activityPort);
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction,
+ aNode.getActivityNodeId(), activityPort);
}
}
@@ -791,13 +779,8 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class,
- OperatorDescriptorId.class,
- ActivityNodeId.class,
- OperatorDescriptorId.class,
- ActivityNodeId.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+ ActivityNodeId.class, OperatorDescriptorId.class, ActivityNodeId.class };
public ActivityBlockedTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -819,9 +802,7 @@
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, Long.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, Long.class };
public JobStartTable() {
super(TABLE_NAME, SCHEMA);
@@ -841,9 +822,8 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+ Set.class };
public StartMessageTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -859,9 +839,7 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, Set.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
public JobCleanUpTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -875,9 +853,7 @@
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class };
public JobCleanUpCompleteTable() {
super(TABLE_NAME, SCHEMA);
@@ -897,16 +873,15 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
+ StageletStatistics.class };
public StageletCompleteTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
- StageletStatistics statistics) {
+ StageletStatistics statistics) {
return new Tuple(jobId, stageId, nodeId, attempt, statistics);
}
}
@@ -920,9 +895,7 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, String.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
public StageletFailureTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -942,9 +915,7 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- String.class
- };
+ private static final Class[] SCHEMA = new Class[] { String.class };
public AvailableNodesTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -964,9 +935,7 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- String.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { String.class, Integer.class };
public RankedAvailableNodesTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -986,9 +955,7 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- String.class
- };
+ private static final Class[] SCHEMA = new Class[] { String.class };
public FailedNodesTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1008,9 +975,8 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+ Set.class };
public AbortMessageTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1026,9 +992,7 @@
private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, UUID.class, String.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
public AbortNotifyTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1043,9 +1007,8 @@
private static final String TABLE_NAME = "expandpartitioncountconstraint";
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
- };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+ Integer.class };
public ExpandPartitionCountConstraintTableFunction() {
super(TABLE_NAME, SCHEMA);
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 195fe00..758a2a0 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -151,7 +151,7 @@
Matcher m = pattern.matcher(ipaddrStr);
if (!m.matches()) {
throw new Exception(MessageFormat.format(
- "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
}
byte[] ipBytes = new byte[4];
ipBytes[0] = (byte) Integer.parseInt(m.group(1));
@@ -163,7 +163,8 @@
@Override
public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+ throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
final Joblet joblet = getLocalJoblet(jobId);
@@ -184,7 +185,8 @@
IOperatorDescriptor op = han.getOwner();
List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
for (int i : tasks.get(hanId)) {
- IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+ IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i,
+ opPartitions.get(op.getOperatorId()).size());
OperatorRunnable or = new OperatorRunnable(ctx, hon);
stagelet.setOperator(op.getOperatorId(), i, or);
if (inputs != null) {
@@ -194,21 +196,21 @@
}
IConnectorDescriptor conn = inputs.get(j);
OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
+ .getOperatorId();
OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
+ .getOperatorId();
Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
endpointList.add(endpoint);
DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
- .getTaskInputMap().get(hanId).get(j), i);
+ .getTaskInputMap().get(hanId).get(j), i);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
}
portMap.put(piId, endpoint);
IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
- .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+ .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
or.setFrameReader(reader);
}
}
@@ -222,10 +224,10 @@
}
private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
- final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
- throws HyracksDataException {
+ final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+ throws HyracksDataException {
final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
- nConsumerCount);
+ nConsumerCount);
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
private int frameCount;
@@ -248,17 +250,18 @@
@Override
public void close() throws HyracksDataException {
reader.close();
- stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
- String.valueOf(frameCount));
+ stagelet.getStatistics()
+ .getStatisticsMap()
+ .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+ String.valueOf(frameCount));
}
} : reader;
}
@Override
public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
- Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
- final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
final Joblet ji = getLocalJoblet(jobId);
Stagelet si = (Stagelet) ji.getStagelet(stageId);
@@ -278,15 +281,15 @@
for (int j = 0; j < outputs.size(); ++j) {
final IConnectorDescriptor conn = outputs.get(j);
OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
+ .getOperatorId();
OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
+ .getOperatorId();
final int senderIndex = i;
IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
Endpoint ep = globalPortMap.get(piId);
if (ep == null) {
LOGGER.info("Got null Endpoint for " + piId);
@@ -295,12 +298,12 @@
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
}
- return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
- .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+ ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
}
};
- or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
- producerOpId).size(), opPartitions.get(consumerOpId).size()));
+ or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i,
+ opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()));
}
}
stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
@@ -309,7 +312,7 @@
}
private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
- final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+ final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
private int frameCount;
@@ -328,9 +331,10 @@
@Override
public void close() throws HyracksDataException {
writer.close();
- stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
- String.valueOf(frameCount));
+ stagelet.getStatistics()
+ .getStatisticsMap()
+ .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
+ + receiverIndex, String.valueOf(frameCount));
}
} : writer;
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e16cf30..32793eb 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -113,8 +113,8 @@
return;
}
try {
- LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
- + opIId.getOperatorId() + ":" + opIId.getPartition());
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": STARTING");
} catch (Exception e) {
e.printStackTrace();
// notifyOperatorFailure(opIId);
@@ -126,6 +126,13 @@
e.printStackTrace();
// notifyOperatorFailure(opIId);
}
+ try {
+ LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+ + opIId.getPartition() + ": TERMINATED");
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
+ }
}
});
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
index e5052b2..457a44c 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
@@ -92,13 +92,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return null;
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -306,13 +306,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return null;
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
index 45e882a..1c7e6c7 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
@@ -77,13 +77,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return null;
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -240,13 +240,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return null;
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
IOperatorNodePushable op = new IOperatorNodePushable() {
private IFrameWriter writer;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
index faad82c..807e890 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
@@ -71,13 +71,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
index 5e52989..9f06d7b 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
@@ -63,13 +63,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private FileChannel out;
private int frameCount;
@@ -141,13 +141,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private IFrameWriter writer;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
index fd32245..a07a2f9 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
@@ -59,13 +59,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
index feb4d2f..08ec2bb 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
@@ -50,13 +50,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
- int partition) {
+ public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private ArrayList<Object[]> buffer;
@@ -105,13 +105,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
- int partition) {
+ public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private IOpenableDataWriter<Object[]> writer;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
index 82e805b..a4571ee 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
@@ -102,13 +102,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
index 5bf6a86..d04df29 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
@@ -73,14 +73,14 @@
protected FileSplit[] splits;
public FileSplit[] getSplits() {
- return splits;
- }
+ return splits;
+ }
- public void setSplits(FileSplit[] splits) {
- this.splits = splits;
- }
+ public void setSplits(FileSplit[] splits) {
+ this.splits = splits;
+ }
- public AbstractFileWriteOperatorDescriptor(JobSpecification spec, FileSplit[] splits) {
+ public AbstractFileWriteOperatorDescriptor(JobSpecification spec, FileSplit[] splits) {
super(spec, 1, 0);
this.splits = splits;
}
@@ -89,13 +89,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
index 2fc6b63..664c84d 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
@@ -74,13 +74,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
final RecordDescriptor rd0 = plan.getJobSpecification()
.getOperatorInputRecordDescriptor(getOperatorId(), 0);
final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, rd0);
@@ -135,13 +135,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
- int partition) {
+ public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private IFrameWriter writer;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
index 0232d73..e7e04a5 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
@@ -44,13 +44,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
IComparator[] comparators = new IComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createComparator();
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
index 5ee84e47..90502b0 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.hyracks.coreops.hadoop;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
@@ -143,13 +143,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
index 5385e44..c144365 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
@@ -33,7 +33,6 @@
import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
-import edu.uci.ics.hyracks.hadoop.util.ClasspathBasedHadoopClassFactory;
import edu.uci.ics.hyracks.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.hadoop.util.IHadoopClassFactory;
@@ -91,7 +90,7 @@
private static final long serialVersionUID = 1L;
private static final String mapClassNameKey = "mapred.mapper.class";
private Class<? extends Mapper> mapperClass;
-
+
public HadoopMapperOperatorDescriptor(JobSpecification spec, Class<? extends Mapper> mapperClass,
RecordDescriptor recordDescriptor, JobConf jobConf) {
super(spec, recordDescriptor, jobConf, null);
@@ -102,20 +101,24 @@
super(spec, null, jobConf, hadoopClassFactory);
}
- public RecordDescriptor getRecordDescriptor(JobConf conf){
- RecordDescriptor recordDescriptor = null;
- String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
- String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
- try{
- if(getHadoopClassFactory() == null){
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class < ? extends Writable>) Class.forName(mapOutputKeyClassName),(Class < ? extends Writable>) Class.forName(mapOutputValueClassName));
- }else{
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class < ? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),(Class < ? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- return recordDescriptor;
+ public RecordDescriptor getRecordDescriptor(JobConf conf) {
+ RecordDescriptor recordDescriptor = null;
+ String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
+ String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
+ try {
+ if (getHadoopClassFactory() == null) {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+ (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+ } else {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return recordDescriptor;
}
private Mapper<K1, V1, K2, V2> createMapper() throws Exception {
@@ -123,21 +126,21 @@
return mapperClass.newInstance();
} else {
String mapperClassName = super.getJobConfMap().get(mapClassNameKey);
- Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
+ Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
mapperClass = (Class<? extends Mapper>) mapper.getClass();
- return (Mapper)mapper;
+ return (Mapper) mapper;
}
}
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
index 7881d45..e7bfa31 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
@@ -22,11 +22,11 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
import edu.uci.ics.hyracks.api.dataflow.IDataReader;
import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
@@ -183,7 +183,6 @@
private static final String reducerClassKey = "mapred.reducer.class";
private static final String comparatorClassKey = "mapred.output.value.groupfn.class";
private IComparatorFactory comparatorFactory;
-
public HadoopReducerOperatorDescriptor(JobSpecification spec, IComparatorFactory comparatorFactory,
Class<? extends Reducer> reducerClass, RecordDescriptor recordDescriptor, JobConf jobConf) {
@@ -192,49 +191,49 @@
this.reducerClass = reducerClass;
}
-
public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IHadoopClassFactory classFactory) {
- super(spec, null , conf, classFactory);
+ super(spec, null, conf, classFactory);
}
-
+
private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
- if(reducerClass != null){
- return reducerClass.newInstance();
- }else{
- Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
+ if (reducerClass != null) {
+ return reducerClass.newInstance();
+ } else {
+ Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
reducerClass = (Class<? extends Reducer>) reducer.getClass();
- return (Reducer)reducer;
+ return (Reducer) reducer;
}
}
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
try {
- if(this.comparatorFactory == null){
- String comparatorClassName = getJobConfMap().get(comparatorClassKey);
- RawComparator rawComparator = null;
- if(comparatorClassName != null){
- Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
- this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
-
- }else{
- String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
- if(getHadoopClassFactory() != null){
- rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
- }else{
- rawComparator = WritableComparator.get((Class<? extends WritableComparable>)Class.forName(mapOutputKeyClass));
- }
- this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
- }
- }
- IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
+ if (this.comparatorFactory == null) {
+ String comparatorClassName = getJobConfMap().get(comparatorClassKey);
+ RawComparator rawComparator = null;
+ if (comparatorClassName != null) {
+ Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
+ this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
+
+ } else {
+ String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
+ if (getHadoopClassFactory() != null) {
+ rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
+ } else {
+ rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
+ .forName(mapOutputKeyClass));
+ }
+ this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
+ }
+ }
+ IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
} catch (Exception e) {
@@ -260,21 +259,25 @@
return true;
}
- @Override
- public RecordDescriptor getRecordDescriptor(JobConf conf) {
- String outputKeyClassName = conf.get("mapred.output.key.class");
- String outputValueClassName = conf.get("mapred.output.value.class");
- RecordDescriptor recordDescriptor = null;
- try{
- if(getHadoopClassFactory() == null){
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>)Class.forName(outputKeyClassName), (Class<? extends Writable>)Class.forName(outputValueClassName));
- }else{
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>)getHadoopClassFactory().loadClass(outputKeyClassName), (Class<? extends Writable>)getHadoopClassFactory().loadClass(outputValueClassName));
- }
- }catch(Exception e){
- e.printStackTrace();
- return null;
- }
- return recordDescriptor;
- }
+ @Override
+ public RecordDescriptor getRecordDescriptor(JobConf conf) {
+ String outputKeyClassName = conf.get("mapred.output.key.class");
+ String outputValueClassName = conf.get("mapred.output.value.class");
+ RecordDescriptor recordDescriptor = null;
+ try {
+ if (getHadoopClassFactory() == null) {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(outputKeyClassName),
+ (Class<? extends Writable>) Class.forName(outputValueClassName));
+ } else {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputKeyClassName),
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputValueClassName));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ return recordDescriptor;
+ }
}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
new file mode 100644
index 0000000..79d88af
--- /dev/null
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.coreops.join;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
+
+public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final String SMALLRELATION = "RelR";
+ private static final String LARGERELATION = "RelS";
+ private static final String NUM_PARTITION = "NUMBER_PARTITIONS";
+ private static final long serialVersionUID = 1L;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final int inputsize0;
+ private final int recordsPerFrame;
+ private final int memsize;
+ private final double factor;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ private int numReadI1 = 0;
+ private int numWrite = 0;
+ private int numReadI2 = 0;
+
+ public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
+ HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+ JoinActivityNode join = new JoinActivityNode();
+
+ builder.addTask(rpart);
+ builder.addSourceEdge(0, rpart, 0);
+
+ builder.addTask(spart);
+ builder.addSourceEdge(1, spart, 0);
+
+ builder.addTask(join);
+ builder.addBlockingEdge(rpart, spart);
+ builder.addBlockingEdge(spart, join);
+
+ builder.addTargetEdge(0, join, 0);
+ }
+
+ public int getMemorySize() {
+ return memsize;
+ }
+
+ private class HashPartitionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+ private String relationName;
+ private int operatorInputIndex;
+ private int keys[];
+
+ public HashPartitionActivityNode(String relationName, int keys[], int operatorInputIndex) {
+ this.relationName = relationName;
+ this.keys = keys;
+ this.operatorInputIndex = operatorInputIndex;
+ }
+
+ @Override
+ public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+ int partition, int nPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = plan.getJobSpecification().getOperatorInputRecordDescriptor(getOperatorId(),
+ operatorInputIndex);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ IOperatorNodePushable op = new IOperatorNodePushable() {
+ private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
+
+ ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories)
+ .createPartitioner();
+
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ private ByteBuffer[] outbufs;
+ private File[] files;
+ private FileChannel[] channels;
+ private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer) {
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < numPartitions; i++) {
+ try {
+ ByteBuffer head = outbufs[i];
+ accessor0.reset(head);
+ if (accessor0.getTupleCount() > 0) {
+ FileChannel wChannel = channels[i];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(files[i], "rw").getChannel();
+ channels[i] = wChannel;
+ }
+ wChannel.write(head);
+ numWrite++;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException("error generating partition " + files[i].getName());
+ }
+ }
+
+ env.set(relationName, channels);
+ env.set(NUM_PARTITION, numPartitions);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor0.reset(buffer);
+ int tCount = accessor0.getTupleCount();
+ for (int i = 0; i < tCount; ++i) {
+
+ int entry = hpc.partition(accessor0, i, numPartitions);
+ boolean newBuffer = false;
+ ByteBuffer outbuf = outbufs[entry];
+ while (true) {
+ appender.reset(outbuf, newBuffer);
+ if (appender.append(accessor0, i)) {
+ break;
+ } else {
+ // buffer is full, ie. we cannot fit the tuple
+ // into the buffer -- write it to disk
+ try {
+
+ FileChannel wChannel = channels[entry];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(files[entry], "rw").getChannel();
+ channels[entry] = wChannel;
+ }
+
+ wChannel.write(outbuf);
+ numWrite++;
+ outbuf.clear();
+ newBuffer = true;
+ } catch (IOException e) {
+ throw new HyracksDataException("error generating partition "
+ + files[entry].getName());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ outbufs = new ByteBuffer[numPartitions];
+ files = new File[numPartitions];
+ channels = new FileChannel[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ try {
+ files[i] = ctx.getResourceManager().createFile(relationName, null);
+ files[i].deleteOnExit();
+ outbufs[i] = ctx.getResourceManager().allocateFrame();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return GraceHashJoinOperatorDescriptor.this;
+ }
+
+ @Override
+ public boolean supportsPullInterface() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsPushInterface() {
+ return true;
+ }
+
+ }
+
+ private class JoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+ int partition, int nPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, int nPartitions) {
+ final RecordDescriptor rd0 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ IOperatorNodePushable op = new IOperatorNodePushable() {
+ private InMemoryHashJoin joiner;
+ private ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0,
+ hashFunctionFactories).createPartitioner();
+ private ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories).createPartitioner();
+
+ private IFrameWriter writer;
+ private FileChannel[] channelsR;
+ private FileChannel[] channelsS;
+ private int numPartitions;
+ private int[] maxBufferRi;
+
+ @Override
+ public void open() throws HyracksDataException {
+ channelsR = (FileChannel[]) env.get(SMALLRELATION);
+ channelsS = (FileChannel[]) env.get(LARGERELATION);
+ numPartitions = (Integer) env.get(NUM_PARTITION);
+
+ ITuplePartitionComputer hpcRep0 = new RepartitionComputer(numPartitions, hpc0);
+ ITuplePartitionComputer hpcRep1 = new RepartitionComputer(numPartitions, hpc1);
+
+ writer.open();// open for probe
+
+ maxBufferRi = new int[numPartitions];
+
+ ByteBuffer buffer = ctx.getResourceManager().allocateFrame();// input
+ // buffer
+ int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+ for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+ int counter = 0;
+ int state = 0;
+ try {
+ FileChannel inChannelR = channelsR[partitionid];
+ if (inChannelR != null) {
+ inChannelR.position(0);
+ while (state != -1) {
+
+ joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx, rd0),
+ hpcRep0, new FrameTupleAccessor(ctx, rd1), hpcRep1,
+ new FrameTuplePairComparator(keys0, keys1, comparators));
+ // build
+
+ state = inChannelR.read(buffer);
+ while (state != -1) {
+
+ ByteBuffer copyBuffer = ctx.getResourceManager().allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ numReadI1++;
+ counter++;
+ if (counter > maxBufferRi[partitionid])
+ maxBufferRi[partitionid] = counter;
+
+ buffer.clear();
+ state = inChannelR.read(buffer);
+ }
+
+ // probe
+
+ buffer.clear();
+
+ FileChannel inChannelS = channelsS[partitionid];
+ if (inChannelS != null) {
+ inChannelS.position(0);
+ while (inChannelS.read(buffer) != -1) {
+ joiner.join(buffer, writer);
+ numReadI2++;
+ buffer.clear();
+ }
+ inChannelS.close();
+ joiner.closeJoin(writer);
+ }
+ }
+ inChannelR.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ writer.close();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ env.set(LARGERELATION, null);
+ env.set(SMALLRELATION, null);
+ env.set(NUM_PARTITION, null);
+ }
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer) {
+ if (index != 0) {
+ throw new IllegalStateException();
+ }
+ this.writer = writer;
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return GraceHashJoinOperatorDescriptor.this;
+ }
+
+ @Override
+ public boolean supportsPullInterface() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsPushInterface() {
+ return true;
+ }
+ }
+}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
index 38fbc69..51fc7c1 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
@@ -80,13 +80,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
final RecordDescriptor rd0 = plan.getJobSpecification()
.getOperatorInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = plan.getJobSpecification()
@@ -150,13 +150,13 @@
@Override
public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition) {
+ int partition, int nPartitions) {
throw new UnsupportedOperationException();
}
@Override
public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition) {
+ final IOperatorEnvironment env, int partition, int nPartitions) {
IOperatorNodePushable op = new IOperatorNodePushable() {
private IFrameWriter writer;
private InMemoryHashJoin joiner;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java
new file mode 100644
index 0000000..c5c3af8
--- /dev/null
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.coreops.join;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+
+public class RepartitionComputer implements ITuplePartitionComputer {
+ private int factor;
+ private ITuplePartitionComputer delegate;
+
+ public RepartitionComputer(int factor, ITuplePartitionComputer delegate) {
+ super();
+ this.factor = factor;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int partition(FrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ return delegate.partition(accessor, tIndex, factor * nParts) / factor;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 6770fd8..64a59c4 100644
--- a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.coreops.data.StringSerializerDeserializer;
import edu.uci.ics.hyracks.coreops.file.CSVFileScanOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.file.FileSplit;
+import edu.uci.ics.hyracks.coreops.join.GraceHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.join.InMemoryHashJoinOperatorDescriptor;
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
@@ -51,87 +52,122 @@
public void customerOrderCIDJoin() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
- };
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
- };
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
- 1
- }, new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDGraceJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ join.setPartitionConstraint(joinPartitionConstraint);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -152,104 +188,143 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ '|', "'\"");
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ "'\"");
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
- 1
- }, new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, custOrderJoinDesc, 128);
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDGraceJoinMulti() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ join.setPartitionConstraint(joinPartitionConstraint);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -264,101 +339,65 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ '|', "'\"");
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ "'\"");
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
- 1
- }, new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, custOrderJoinDesc, 128);
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
join.setPartitionConstraint(new PartitionCountConstraint(2));
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -373,114 +412,75 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ '|', "'\"");
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ "'\"");
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- }));
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- }));
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
- 1
- }, new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, custOrderJoinDesc, 128);
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custPartConn, custScanner, 0, custMat, 0);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);