Joins

The movie dataset comes in multiple parts. It is very natural to ask questions about the relationship between the parts.

  • What is the mean rating per genre?
  • What is the favorite movie for each occupation?
  • What genres are most preferred by women vs men?

Joins let us combine multiple datasets together to answer joint questions.

In [1]:
import hail as hl
import seaborn

hl.utils.get_movie_lens('data/')

users = hl.read_table('data/users.ht')
movies = hl.read_table('data/movies.ht')
ratings = hl.read_table('data/ratings.ht')
Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.0
SparkUI available at http://172.31.20.142:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version devel-f7631a0c96cd
NOTE: This is a beta version. Interfaces may change
  during the beta period. We recommend pulling
  the latest changes weekly.
2018-07-16 22:05:45 Hail: INFO: Movie Lens files found!

To understand joins in Hail, we need to revisit one of the crucial properties of Tables: the key.

A Table has an ordered list of fields known as the key. The key is shown by describe.

In [2]:
users.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'id': int32
    'age': int32
    'sex': str
    'occupation': str
    'zipcode': str
----------------------------------------
Key: ['id']
----------------------------------------

key is a struct expression of all of the key fields.

In [3]:
users.key.describe()
--------------------------------------------------------
Type:
    struct {
        id: int32
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7f57e6e73160>
Index:
    ['row']
--------------------------------------------------------

Keys need not be unique or non-missing, although in many applications they will be both.

Hail’s join syntax is most easily understood through an example.

In [4]:
t1 = hl.Table.parallelize([
    {'a': 'foo', 'b': 1},
    {'a': 'bar', 'b': 2},
    {'a': 'bar', 'b': 2}],
    hl.tstruct(a=hl.tstr, b=hl.tint32),
    key='a')
t2 = hl.Table.parallelize([
    {'t': 'foo', 'x': 3.14},
    {'t': 'bar', 'x': 2.78},
    {'t': 'bar', 'x': -1},
    {'t': 'quam', 'x': 0}],
    hl.tstruct(t=hl.tstr, x=hl.tfloat64),
    key='t')
In [5]:
t1.show()
+-----+-------+
| a   |     b |
+-----+-------+
| str | int32 |
+-----+-------+
| foo |     1 |
| bar |     2 |
| bar |     2 |
+-----+-------+

In [6]:
t2.show()
+------+--------------+
| t    |            x |
+------+--------------+
| str  |      float64 |
+------+--------------+
| foo  |  3.14000e+00 |
| bar  |  2.78000e+00 |
| bar  | -1.00000e+00 |
| quam |  0.00000e+00 |
+------+--------------+

In [7]:
j = t1.annotate(t2_x = t2[t1.a].x)
j.show()
2018-07-16 22:05:48 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-07-16 22:05:48 Hail: INFO: Coerced sorted dataset
+-----+-------+-------------+
| a   |     b |        t2_x |
+-----+-------+-------------+
| str | int32 |     float64 |
+-----+-------+-------------+
| bar |     2 | 2.78000e+00 |
| bar |     2 | 2.78000e+00 |
| foo |     1 | 3.14000e+00 |
+-----+-------+-------------+

The magic of keys is that they turn tables into maps: table[expr] should naturally refer to the row of table that has key the value of expr. Note: if the row is not unique, one such row is chosen arbitrarily.

Here’s a subtle bit: if expr is an expression indexed by row of table2, then table[expr] is also an expression indexed by row of table2.

Also note that while they look similar, table['field1'] and table[table2.key] are doing very different things!

In [8]:
t1
Out[8]:
<hail.table.Table at 0x7f581c150b38>
In [9]:
t2[t1.a].describe()
--------------------------------------------------------
Type:
    struct {
        x: float64
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7f581c150b38>
Index:
    ['row']
--------------------------------------------------------

Now let’s use joins to compute the average movie rating per genre.

In [10]:
t = (ratings.group_by(ratings.movie_id)
     .aggregate(rating = hl.agg.mean(ratings.rating)))
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
----------------------------------------
Key: ['movie_id']
----------------------------------------
In [11]:
# now join in the movie genre
t = t.annotate(genres = movies[t.movie_id].genres)
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
    'genres': array<str>
----------------------------------------
Key: ['movie_id']
----------------------------------------
In [12]:
t.show()
2018-07-16 22:05:49 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+-------------+-------------------------------------+
| movie_id |      rating | genres                              |
+----------+-------------+-------------------------------------+
|    int32 |     float64 | array<str>                          |
+----------+-------------+-------------------------------------+
|        1 | 3.87832e+00 | ["Animation","Children's","Comedy"] |
|        2 | 3.20611e+00 | ["Action","Adventure","Thriller"]   |
|        3 | 3.03333e+00 | ["Thriller"]                        |
|        4 | 3.55024e+00 | ["Action","Comedy","Drama"]         |
|        5 | 3.30233e+00 | ["Crime","Drama","Thriller"]        |
|        6 | 3.57692e+00 | ["Drama"]                           |
|        7 | 3.79847e+00 | ["Drama","Sci-Fi"]                  |
|        8 | 3.99543e+00 | ["Children's","Comedy","Drama"]     |
|        9 | 3.89632e+00 | ["Drama"]                           |
|       10 | 3.83146e+00 | ["Drama","War"]                     |
+----------+-------------+-------------------------------------+
showing top 10 rows

Explode

Now we want to group by genres, but they’re packed up in an array. To unpack the genres, we can use explode. explode creates a new row for each element in the value of the field, which must be a collection (array or set).

In [13]:
t = t.explode(t.genres)
t.show()
2018-07-16 22:05:52 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+-------------+------------+
| movie_id |      rating | genres     |
+----------+-------------+------------+
|    int32 |     float64 | str        |
+----------+-------------+------------+
|        1 | 3.87832e+00 | Animation  |
|        1 | 3.87832e+00 | Children's |
|        1 | 3.87832e+00 | Comedy     |
|        2 | 3.20611e+00 | Action     |
|        2 | 3.20611e+00 | Adventure  |
|        2 | 3.20611e+00 | Thriller   |
|        3 | 3.03333e+00 | Thriller   |
|        4 | 3.55024e+00 | Action     |
|        4 | 3.55024e+00 | Comedy     |
|        4 | 3.55024e+00 | Drama      |
+----------+-------------+------------+
showing top 10 rows

In [14]:
t = (t.group_by(t.genres)
     .aggregate(rating = hl.agg.mean(t.rating)))
# save the intermediate result
t = t.cache()
t.show(n=100)
2018-07-16 22:05:53 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-07-16 22:05:56 Hail: INFO: Ordering unsorted dataset with network shuffle
+-------------+-------------+
| genres      |      rating |
+-------------+-------------+
| str         |     float64 |
+-------------+-------------+
| Action      | 2.96633e+00 |
| Adventure   | 3.14397e+00 |
| Animation   | 3.29881e+00 |
| Children's  | 2.91688e+00 |
| Comedy      | 3.00056e+00 |
| Crime       | 3.21101e+00 |
| Documentary | 3.22927e+00 |
| Drama       | 3.18735e+00 |
| Fantasy     | 2.84983e+00 |
| Film-Noir   | 3.54835e+00 |
| Horror      | 2.73016e+00 |
| Musical     | 3.37642e+00 |
| Mystery     | 3.33681e+00 |
| Romance     | 3.24405e+00 |
| Sci-Fi      | 3.16545e+00 |
| Thriller    | 3.13669e+00 |
| War         | 3.48919e+00 |
| Western     | 3.18562e+00 |
+-------------+-------------+

Ordering

We can sort tables using order_by. Default is ascending, but you can control the direction with asc and desc.

In [15]:
t = t.order_by(hl.desc(t.rating))
t.show(n=100)
+-------------+-------------+
| genres      |      rating |
+-------------+-------------+
| str         |     float64 |
+-------------+-------------+
| Film-Noir   | 3.54835e+00 |
| War         | 3.48919e+00 |
| Musical     | 3.37642e+00 |
| Mystery     | 3.33681e+00 |
| Animation   | 3.29881e+00 |
| Romance     | 3.24405e+00 |
| Documentary | 3.22927e+00 |
| Crime       | 3.21101e+00 |
| Drama       | 3.18735e+00 |
| Western     | 3.18562e+00 |
| Sci-Fi      | 3.16545e+00 |
| Adventure   | 3.14397e+00 |
| Thriller    | 3.13669e+00 |
| Comedy      | 3.00056e+00 |
| Action      | 2.96633e+00 |
| Children's  | 2.91688e+00 |
| Fantasy     | 2.84983e+00 |
| Horror      | 2.73016e+00 |
+-------------+-------------+

Tables also have a SQL-style inner/left/right/outer join method.

SQL-style joins for MatrixTable are coming soon.

Exercises

  • What is the favorite movie for each occupation?
  • What genres are rated most differently by men and women?