import hail as hl
from .java import FatalError, Env, info
from .misc import local_path_uri, new_local_temp_dir
import os
import zipfile
from urllib.request import urlretrieve
__all__ = [
'get_1kg',
'get_movie_lens'
]
resources = {
'1kg_annotations': 'https://storage.googleapis.com/hail-tutorial/1kg_annotations.txt',
'1kg_matrix_table': 'https://storage.googleapis.com/hail-tutorial/1kg.vcf.bgz',
'movie_lens_100k': 'http://files.grouplens.org/datasets/movielens/ml-100k.zip',
}
tmp_dir: str = None
def init_temp_dir():
global tmp_dir
if tmp_dir is None:
tmp_dir = new_local_temp_dir()
def _mkdir(jhc, path):
if not Env.jutils().dirExists(jhc, path):
r = Env.jutils().mkdir(jhc, path)
if not r:
raise IOError(f'could not mkdir {path}')
[docs]def get_1kg(output_dir, overwrite: bool = False):
"""Download subset of the `1000 Genomes <http://www.internationalgenome.org/>`__
dataset and sample annotations.
Notes
-----
The download is about 15M.
Parameters
----------
output_dir
Directory in which to write data.
overwrite
If ``True``, overwrite any existing files/directories at `output_dir`.
"""
jhc = Env.hc()._jhc
_mkdir(jhc, output_dir)
matrix_table_path = os.path.join(output_dir, '1kg.mt')
annotations_path = os.path.join(output_dir, '1kg_annotations.txt')
if (overwrite
or not Env.jutils().dirExists(jhc, matrix_table_path)
or not Env.jutils().fileExists(jhc, annotations_path)):
init_temp_dir()
tmp_vcf = os.path.join(tmp_dir, '1kg.vcf.bgz')
source = resources['1kg_matrix_table']
info(f'downloading 1KG VCF ...\n'
f' Source: {source}')
urlretrieve(resources['1kg_matrix_table'], tmp_vcf)
cluster_readable_vcf = Env.jutils().copyToTmp(jhc, local_path_uri(tmp_vcf), 'vcf')
info('importing VCF and writing to matrix table...')
hl.import_vcf(cluster_readable_vcf, min_partitions=16).write(matrix_table_path, overwrite=True)
tmp_annot = os.path.join(tmp_dir, '1kg_annotations.txt')
source = resources['1kg_annotations']
info(f'downloading 1KG annotations ...\n'
f' Source: {source}')
urlretrieve(source, tmp_annot)
hl.hadoop_copy(local_path_uri(tmp_annot), annotations_path)
info('Done!')
else:
info('1KG files found')
[docs]def get_movie_lens(output_dir, overwrite: bool = False):
"""Download public Movie Lens dataset.
Notes
-----
The download is about 6M.
See the
`MovieLens website <https://grouplens.org/datasets/movielens/100k/>`__
for more information about this dataset.
Parameters
----------
output_dir
Directory in which to write data.
overwrite
If ``True``, overwrite existing files/directories at those locations.
"""
jhc = Env.hc()._jhc
_mkdir(jhc, output_dir)
paths = [os.path.join(output_dir, x) for x in ['movies.ht', 'ratings.ht', 'users.ht']]
if overwrite or any(not Env.jutils().dirExists(jhc, f) for f in paths):
init_temp_dir()
source = resources['movie_lens_100k']
tmp_path = os.path.join(tmp_dir, 'ml-100k.zip')
info(f'downloading MovieLens-100k data ...\n'
f' Source: {source}')
urlretrieve(source, tmp_path)
with zipfile.ZipFile(tmp_path, 'r') as z:
z.extractall(tmp_dir)
user_table_path = os.path.join(os.path.join(tmp_dir, 'ml-100k', 'u.user'))
movie_table_path = os.path.join(os.path.join(tmp_dir, 'ml-100k', 'u.item'))
ratings_table_path = os.path.join(os.path.join(tmp_dir, 'ml-100k', 'u.data'))
assert (os.path.exists(user_table_path))
assert (os.path.exists(movie_table_path))
assert (os.path.exists(ratings_table_path))
user_cluster_readable = Env.jutils().copyToTmp(jhc, local_path_uri(user_table_path), 'txt')
movie_cluster_readable = Env.jutils().copyToTmp(jhc, local_path_uri(movie_table_path), 'txt')
ratings_cluster_readable = Env.jutils().copyToTmp(jhc, local_path_uri(ratings_table_path), 'txt')
[movies_path, ratings_path, users_path] = paths
genres = ['Action', 'Adventure', 'Animation',
"Children's", 'Comedy', 'Crime',
'Documentary', 'Drama', 'Fantasy',
'Film-Noir', 'Horror', 'Musical',
'Mystery', 'Romance', 'Sci-Fi',
'Thriller', 'War', 'Western']
# utility functions for importing movies
def field_to_array(ds, field):
return hl.cond(ds[field] != 0, hl.array([field]), hl.empty_array(hl.tstr))
def fields_to_array(ds, fields):
return hl.flatten(hl.array([field_to_array(ds, f) for f in fields]))
def rename_columns(ht, new_names):
return ht.rename({k: v for k, v in zip(ht.row, new_names)})
info(f'importing users table and writing to {users_path} ...')
users = rename_columns(
hl.import_table(user_cluster_readable, key=['f0'], no_header=True, impute=True, delimiter='|'),
['id', 'age', 'sex', 'occupation', 'zipcode'])
users.write(users_path, overwrite=True)
info(f'importing movies table and writing to {movies_path} ...')
movies = hl.import_table(movie_cluster_readable, key=['f0'], no_header=True, impute=True, delimiter='|')
movies = rename_columns(movies,
['id', 'title', 'release date', 'video release date', 'IMDb URL', 'unknown'] + genres)
movies = movies.drop('release date', 'video release date', 'unknown', 'IMDb URL')
movies = movies.transmute(genres=fields_to_array(movies, genres))
movies.write(movies_path, overwrite=True)
info(f'importing ratings table and writing to {ratings_path} ...')
ratings = hl.import_table(ratings_cluster_readable, no_header=True, impute=True)
ratings = rename_columns(ratings,
['user_id', 'movie_id', 'rating', 'timestamp'])
ratings = ratings.drop('timestamp')
ratings.write(ratings_path, overwrite=True)
else:
info('Movie Lens files found!')