yura
yura

Reputation: 14655

How to implement OR join in hadoop(scalding/cascading)

It is easy to join datasets by single key simply by sending join field as a reducer key. But joining records by several keys where at least one shoud be the same is not that easy for me.

Example I have logs and I want to group them by user parameters, I want to join them by (ipAddress, sessionId,visitorCockies)

So log1 should be grouped with log2 if log1.ip == log2.ip OR log1.session = log2.session OR log1.cockie = log2.coockie. Perhaps it is possible to create composite key or some probabalistic approach like minHash...

Is it possible?

Upvotes: 2

Views: 1284

Answers (5)

samthebest
samthebest

Reputation: 31543

Tip: Use type aliases to make your Scalding code nice to read

Note 0: This solution is particularly nice because it will always be just 1 mapred job, even when there are more keys to join on.

Note 1: Assumes each pipe doesn't have duplicate keys, otherwise you would have to make 'key also have an index for which log it came from, and the mapTo would be a flatMapTo and bit more complicated.

Note 2: For simplicity this will discard the joining fields, to keep them you'd need a big ugly tuple (ip1, ip2, session1, session2, ...etc). If you really want I can write out an example that keeps them.

Note 3: If you really wanted to merge the duplicate values, you could follow this with a groupBy each of logEntry1 and logEntry2, produce a logEntryList, then cat (as mentioned in a comment this is not normal for a join). This will create 2 more mapred jobs.

type String2 = (String, String)
type String3 = (String, String, String)

def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
  _.productIterator.toList.zipWithIndex.map {
    case (key: String, index: Int) => index.toString + key
  }
)

(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group))
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))

Upvotes: 0

imgr8
imgr8

Reputation: 511

For cascading, I ended up creating a Filter which checked if the output of any the conditions inside OR was true. Cascading filters output True/False values which can be used optionally.

Upvotes: 0

Paco
Paco

Reputation: 666

can you describe more about "joining records by several keys"?

if you know the points in your workflow where particular keys could be joined, probably the best approach would be to define a flow which has multiple joins, rather than trying to manipulate a complex data structure for N keys to be resolved in one step.

here's a sample app which shows how to handle different kinds of joins in Cascading: https://github.com/Cascading/CoPA

Upvotes: 0

ericschwarzkopf
ericschwarzkopf

Reputation: 661

After you created the separate joins as described by Joe above, you need to get rid of the duplicates. Two tuples in your data are duplicates iff they are equal in all fields you use in your "OR-join". So if you do a natural join on a key representing all relevant fields afterwards, you'll get all duplicates grouped together. Hence you can replace them by a single occurrence of the respective tuple.

Let's look at an example: Assume you've got tuples with fields (A,B,C,D) and the fields you're interested in are A, B, and C. You would first do equi-joins on A, B, and C separately. For each, you'd join the initial tuple stream with itself. Denote the first stream with (A0, B0, C0, D0) and the second with (A1, B1, C1, D1). The result will be tuples (A0, B0, C0, D0, A1, B1, C1, D1). For each of those tuples, you'd create a tuple (A0A1B0B1C0C1, A0, B0, C0, D0, A1, B1, C1, D1), so all duplicates will be grouped together in a subsequent reducer. For each group, return only one of the contained tuples.

Upvotes: 0

Joe K
Joe K

Reputation: 18434

The issue is that MapReduce joins are typically implemented by giving records that match on some field the same reduce key so that they get sent to the same reducer. So anything to get around this is going to be a bit of a hack, but it is possible...

Here's what I would recommend: for each input record, generate three copies, each with a new "key" field that is prefixed by the field it's coming from. So for example, say you had the following input:

(ip=1.2.3.4, session=ABC, cookie=123)
(ip=3.4.5.6, session=DEF, cookie=456)

Then you would generate

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4)
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC)
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123)
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6)
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF)
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456)

And then you could simply group on this new field.

I'm not too familiar with scalding/cascading (although I've been meaning to learn more about it) but this would definitely conform to how joins are generally done in Hadoop.

Upvotes: 0

Related Questions