Table Joins Tutorial¶
This tutorial walks through some ways to join Hail tables. We’ll use a simple movie dataset to illustrate. The movie dataset comes in multiple parts. Here are a few questions we might naturally ask about the dataset:
- What is the mean rating per genre?
- What is the favorite movie for each occupation?
- What genres are most preferred by women vs men?
We’ll use joins to combine datasets in order to answer these questions.
Let’s initialize Hail, fetch the tutorial data, and load three tables: users, movies, and ratings.
In [1]:
import hail as hl
import seaborn
hl.utils.get_movie_lens('data/')
users = hl.read_table('data/users.ht')
movies = hl.read_table('data/movies.ht')
ratings = hl.read_table('data/ratings.ht')
Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.0
SparkUI available at http://10.56.40.9:4040
Welcome to
__ __ <>__
/ /_/ /__ __/ /
/ __ / _ `/ / /
/_/ /_/\_,_/_/_/ version devel-90a5cab4aab8
NOTE: This is a beta version. Interfaces may change
during the beta period. We recommend pulling
the latest changes weekly.
LOGGING: writing to /hail/repo/hail/build/tmp/python/hail/docs/tutorials/hail-20181015-1344-devel-90a5cab4aab8.log
2018-10-15 13:44:25 Hail: INFO: Movie Lens files found!
The Key to Understanding Joins¶
To understand joins in Hail, we need to revisit one of the crucial properties of tables: the key.
A table has an ordered list of fields known as the key. Our users
table has one key, the id
field. We can see all the fields, as well
as the keys, of a table by calling describe()
.
In [2]:
users.describe()
----------------------------------------
Global fields:
None
----------------------------------------
Row fields:
'id': int32
'age': int32
'sex': str
'occupation': str
'zipcode': str
----------------------------------------
Key: ['id']
----------------------------------------
key
is a struct expression of all of the key fields, so we can refer
to the key of a table without explicitly specifying the names of the key
fields.
In [3]:
users.key.describe()
--------------------------------------------------------
Type:
struct {
id: int32
}
--------------------------------------------------------
Source:
<hail.table.Table object at 0x7f294cbe7710>
Index:
['row']
--------------------------------------------------------
Keys need not be unique or non-missing, although in many applications they will be both.
When tables are joined in Hail, they are joined based on their keys. In order to join two tables, they must share the same number of keys, same key types (i.e. string vs integer), and the same order of keys.
Let’s look at a simple example of a join. We’ll use the
Table.parallelize()
method to create two small tables, t1
and
t2
.
In [4]:
t1 = hl.Table.parallelize([
{'a': 'foo', 'b': 1},
{'a': 'bar', 'b': 2},
{'a': 'bar', 'b': 2}],
hl.tstruct(a=hl.tstr, b=hl.tint32),
key='a')
t2 = hl.Table.parallelize([
{'t': 'foo', 'x': 3.14},
{'t': 'bar', 'x': 2.78},
{'t': 'bar', 'x': -1},
{'t': 'quam', 'x': 0}],
hl.tstruct(t=hl.tstr, x=hl.tfloat64),
key='t')
In [5]:
t1.show()
2018-10-15 13:44:28 Hail: INFO: Coerced sorted dataset
2018-10-15 13:44:28 Hail: INFO: Coerced dataset with out-of-order partitions.
+-------+-------+
| a | b |
+-------+-------+
| str | int32 |
+-------+-------+
| "bar" | 2 |
| "bar" | 2 |
| "foo" | 1 |
+-------+-------+
In [6]:
t2.show()
2018-10-15 13:44:28 Hail: INFO: Ordering unsorted dataset with network shuffle
+--------+-----------+
| t | x |
+--------+-----------+
| str | float64 |
+--------+-----------+
| "bar" | 2.78e+00 |
| "bar" | -1.00e+00 |
| "foo" | 3.14e+00 |
| "quam" | 0.00e+00 |
+--------+-----------+
Now, we can join the tables.
In [7]:
j = t1.annotate(t2_x = t2[t1.a].x)
j.show()
2018-10-15 13:44:29 Hail: INFO: Coerced sorted dataset
2018-10-15 13:44:29 Hail: INFO: Coerced dataset with out-of-order partitions.
2018-10-15 13:44:29 Hail: INFO: Ordering unsorted dataset with network shuffle
+-------+-------+----------+
| a | b | t2_x |
+-------+-------+----------+
| str | int32 | float64 |
+-------+-------+----------+
| "bar" | 2 | 2.78e+00 |
| "bar" | 2 | 2.78e+00 |
| "foo" | 1 | 3.14e+00 |
+-------+-------+----------+
Let’s break this syntax down.
t2[t1.a]
is an expression referring to the row of table t2
with
value t1.a
. So this expression will create a map between the keys of
t1
and the rows of t2
. You can view this mapping directly:
In [8]:
t2[t1.a].show()
2018-10-15 13:44:30 Hail: INFO: Coerced sorted dataset
2018-10-15 13:44:30 Hail: INFO: Coerced dataset with out-of-order partitions.
2018-10-15 13:44:30 Hail: INFO: Ordering unsorted dataset with network shuffle
+-------+----------+
| a | <expr>.x |
+-------+----------+
| str | float64 |
+-------+----------+
| "bar" | 2.78e+00 |
| "bar" | 2.78e+00 |
| "foo" | 3.14e+00 |
+-------+----------+
Since we only want the field x
from t2
, we can select it with
t2[t1.a].x
. Then we add this field to t1
with the
anntotate_rows()
method. The new joined table j
has a field
t2_x
that comes from the rows of t2
. The tables could be joined,
because they shared the same number of keys (1) and the same key type
(string). The keys do not need to share the same name. Notice that the
rows with keys present in t2
but not in t1
do not show up in the
final result. This join syntax performs a left join. Tables also have a
SQL-style inner/left/right/outer
join()
method.
The magic of keys is that they can be used to create a mapping, like a
Python dictionary, between the keys of one table and the row values of
another table: table[expr]
will refer to the row of table
that
has a key value of expr
. If the row is not unique, one such row is
chosen arbitrarily.
Here’s a subtle bit: if expr
is an expression indexed by a row of
table2
, then table[expr]
is also an expression indexed by a row
of table2
.
Also note that while they look similar, table['field']
and
table1[table2.key]
are doing very different things!
table['field']
selects a field from the table, while
table1[table2.key]
creates a mapping between the keys of table2
and the rows of table1
.
In [9]:
t1['a'].describe()
--------------------------------------------------------
Type:
str
--------------------------------------------------------
Source:
<hail.table.Table object at 0x7f294c946550>
Index:
['row']
--------------------------------------------------------
In [10]:
t2[t1.a].describe()
--------------------------------------------------------
Type:
struct {
x: float64
}
--------------------------------------------------------
Source:
<hail.table.Table object at 0x7f294c946550>
Index:
['row']
--------------------------------------------------------
Joining Tables¶
Now that we understand the basics of how joins work, let’s use a join to compute the average movie rating per genre.
We have a table ratings
, which contains user_id
, movie_id
,
and rating
fields. Group by movie_id
and aggregate to get the
mean rating of each movie.
In [11]:
t = (ratings.group_by(ratings.movie_id)
.aggregate(rating = hl.agg.mean(ratings.rating)))
t.describe()
----------------------------------------
Global fields:
None
----------------------------------------
Row fields:
'movie_id': int32
'rating': float64
----------------------------------------
Key: ['movie_id']
----------------------------------------
To get the mean rating by genre, we need to join in the genre field from
the movies
table.
In [12]:
t = t.annotate(genres = movies[t.movie_id].genres)
t.describe()
----------------------------------------
Global fields:
None
----------------------------------------
Row fields:
'movie_id': int32
'rating': float64
'genres': array<str>
----------------------------------------
Key: ['movie_id']
----------------------------------------
In [13]:
t.show()
2018-10-15 13:44:32 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+----------+-------------------------------------+
| movie_id | rating | genres |
+----------+----------+-------------------------------------+
| int32 | float64 | array<str> |
+----------+----------+-------------------------------------+
| 1 | 3.88e+00 | ["Animation","Children's","Comedy"] |
| 2 | 3.21e+00 | ["Action","Adventure","Thriller"] |
| 3 | 3.03e+00 | ["Thriller"] |
| 4 | 3.55e+00 | ["Action","Comedy","Drama"] |
| 5 | 3.30e+00 | ["Crime","Drama","Thriller"] |
| 6 | 3.58e+00 | ["Drama"] |
| 7 | 3.80e+00 | ["Drama","Sci-Fi"] |
| 8 | 4.00e+00 | ["Children's","Comedy","Drama"] |
| 9 | 3.90e+00 | ["Drama"] |
| 10 | 3.83e+00 | ["Drama","War"] |
+----------+----------+-------------------------------------+
showing top 10 rows
We want to group the ratings by genre, but they’re packed up in an array. To unpack the genres, we can use explode.
explode
creates a new row for each element in the value of the
field, which must be a collection (array or set).
In [14]:
t = t.explode(t.genres)
t.show()
2018-10-15 13:44:33 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+----------+--------------+
| movie_id | rating | genres |
+----------+----------+--------------+
| int32 | float64 | str |
+----------+----------+--------------+
| 1 | 3.88e+00 | "Animation" |
| 1 | 3.88e+00 | "Children's" |
| 1 | 3.88e+00 | "Comedy" |
| 2 | 3.21e+00 | "Action" |
| 2 | 3.21e+00 | "Adventure" |
| 2 | 3.21e+00 | "Thriller" |
| 3 | 3.03e+00 | "Thriller" |
| 4 | 3.55e+00 | "Action" |
| 4 | 3.55e+00 | "Comedy" |
| 4 | 3.55e+00 | "Drama" |
+----------+----------+--------------+
showing top 10 rows
Finally, we can get group by genre and aggregate to get the mean rating per genre.
In [15]:
t = (t.group_by(t.genres)
.aggregate(rating = hl.agg.mean(t.rating)))
t.show(n=100)
2018-10-15 13:44:34 Hail: INFO: Ordering unsorted dataset with network shuffle
+---------------+----------+
| genres | rating |
+---------------+----------+
| str | float64 |
+---------------+----------+
| "Action" | 2.97e+00 |
| "Adventure" | 3.14e+00 |
| "Animation" | 3.30e+00 |
| "Children's" | 2.92e+00 |
| "Comedy" | 3.00e+00 |
| "Crime" | 3.21e+00 |
| "Documentary" | 3.23e+00 |
| "Drama" | 3.19e+00 |
| "Fantasy" | 2.85e+00 |
| "Film-Noir" | 3.55e+00 |
| "Horror" | 2.73e+00 |
| "Musical" | 3.38e+00 |
| "Mystery" | 3.34e+00 |
| "Romance" | 3.24e+00 |
| "Sci-Fi" | 3.17e+00 |
| "Thriller" | 3.14e+00 |
| "War" | 3.49e+00 |
| "Western" | 3.19e+00 |
+---------------+----------+
2018-10-15 13:44:34 Hail: INFO: Ordering unsorted dataset with network shuffle
Exercises¶
- What is the favorite movie for each occupation?
- What genres are rated most differently by men and women?