Source code for hail.expr.types

import abc
import json
from collections import Mapping

import hail as hl
from hail import genetics
from hail.expr.type_parsing import type_grammar, type_node_visitor
from hail.genetics.reference_genome import reference_genome_type
from hail.typecheck import *
from hail.utils import Struct, Interval
from hail.utils.java import scala_object, jset, Env, escape_parsable

__all__ = [
    'dtype',
    'HailType',
    'hail_type',
    'is_container',
    'is_compound',
    'is_numeric',
    'is_primitive',
    'types_match',
    'tint',
    'tint32',
    'tint64',
    'tfloat',
    'tfloat32',
    'tfloat64',
    'tstr',
    'tbool',
    'tarray',
    'tset',
    'tdict',
    'tstruct',
    'ttuple',
    'tinterval',
    'tlocus',
    'tcall',
    'hts_entry_schema',
]


[docs]def dtype(type_str): r"""Parse a type from its string representation. Examples -------- >>> hl.dtype('int') dtype('int32') >>> hl.dtype('float') dtype('float64') >>> hl.dtype('array<int32>') dtype('array<int32>') >>> hl.dtype('dict<str, bool>') dtype('dict<str, bool>') >>> hl.dtype('struct{a: int32, `field with spaces`: int64}') dtype('struct{a: int32, `field with spaces`: int64}') Notes ----- This function is able to reverse ``str(t)`` on a :class:`.HailType`. The grammar is defined as follows: .. code-block:: text type = _ (array / set / dict / struct / tuple / interval / int64 / int32 / float32 / float64 / bool / str / call / str / locus) _ int64 = "int64" / "tint64" int32 = "int32" / "tint32" / "int" / "tint" float32 = "float32" / "tfloat32" float64 = "float64" / "tfloat64" / "tfloat" / "float" bool = "tbool" / "bool" call = "tcall" / "call" str = "tstr" / "str" locus = ("tlocus" / "locus") _ "[" identifier "]" array = ("tarray" / "array") _ "<" type ">" set = ("tset" / "set") _ "<" type ">" dict = ("tdict" / "dict") _ "<" type "," type ">" struct = ("tstruct" / "struct") _ "{" (fields / _) "}" tuple = ("ttuple" / "tuple") _ "(" ((type ("," type)*) / _) ")" fields = field ("," field)* field = identifier ":" type interval = ("tinterval" / "interval") _ "<" type ">" identifier = _ (simple_identifier / escaped_identifier) _ simple_identifier = ~"\w+" escaped_identifier = ~"`([^`\\\\]|\\\\.)*`" _ = ~"\s*" Parameters ---------- type_str : :obj:`str` String representation of type. Returns ------- :class:`.HailType` """ tree = type_grammar.parse(type_str)
return type_node_visitor.visit(tree)
[docs]class HailType(object): """ Hail type superclass. """ def __init__(self): self._cached_jtype = None super(HailType, self).__init__() def __repr__(self): s = str(self).replace("'", "\\'") return "dtype('{}')".format(s) @property def _jtype(self): if self._cached_jtype is None: self._cached_jtype = self._get_jtype() return self._cached_jtype @abc.abstractmethod def _eq(self, other): return def __eq__(self, other): return isinstance(other, HailType) and self._eq(other) @abc.abstractmethod def __str__(self): return def __ne__(self, other): return not self.__eq__(other) def __hash__(self): # FIXME this is a bit weird return 43 + hash(str(self))
[docs] def pretty(self, indent=0, increment=4): """Returns a prettily formatted string representation of the type. Parameters ---------- indent : :obj:`int` Spaces to indent. Returns ------- :obj:`str` """ l = [] self._pretty(l, indent, increment)
return ''.join(l) def _pretty(self, l, indent, increment): l.append(str(self)) @classmethod def _from_java(cls, jtype): return hl.dtype(jtype.toString()) @abc.abstractmethod def _typecheck(self, annotation): """ Raise an exception if the given annotation is not the appropriate type. :param annotation: value to check """ return def _to_json(self, x): converted = self._convert_to_json_na(x) return json.dumps(converted) def _convert_to_json_na(self, x): if x is None: return x else: return self._convert_to_json(x) def _convert_to_json(self, x): return x def _from_json(self, s): x = json.loads(s) return self._convert_from_json_na(x) def _convert_from_json_na(self, x): if x is None: return x else: return self._convert_from_json(x) def _convert_from_json(self, x):
return x hail_type = oneof(HailType, transformed((str, dtype))) class _tint32(HailType): """Hail type for signed 32-bit integers. Their values can range from :math:`-2^{31}` to :math:`2^{31} - 1` (approximately 2.15 billion). In Python, these are represented as :obj:`int`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TInt32Optional') super(_tint32, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): if annotation is not None: return Env.jutils().makeInt(annotation) else: return None def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, int): raise TypeError("type 'tint32' expected Python 'int', but found type '%s'" % type(annotation)) elif not self.min_value <= annotation <= self.max_value: raise TypeError(f"Value out of range for 32-bit integer: " f"expected [{self.min_value}, {self.max_value}], found {annotation}") def __str__(self): return "int32" def _eq(self, other): return isinstance(other, _tint32) @property def min_value(self): return -(1 << 31) @property def max_value(self): return (1 << 31) - 1 class _tint64(HailType): """Hail type for signed 64-bit integers. Their values can range from :math:`-2^{63}` to :math:`2^{63} - 1`. In Python, these are represented as :obj:`int`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TInt64Optional') super(_tint64, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): raise NotImplementedError('int64 conversion from Python to JVM') def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, int): raise TypeError("type 'int64' expected Python 'int', but found type '%s'" % type(annotation)) if not self.min_value <= annotation <= self.max_value: raise TypeError(f"Value out of range for 64-bit integer: " f"expected [{self.min_value}, {self.max_value}], found {annotation}") def __str__(self): return "int64" def _eq(self, other): return isinstance(other, _tint64) @property def min_value(self): return -(1 << 63) @property def max_value(self): return (1 << 63) - 1 class _tfloat32(HailType): """Hail type for 32-bit floating point numbers. In Python, these are represented as :obj:`float`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TFloat32Optional') super(_tfloat32, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): # if annotation: # return Env.jutils().makeFloat(annotation) # else: # return annotation # FIXME: This function is unsupported until py4j-0.10.4: https://github.com/bartdag/py4j/issues/255 raise NotImplementedError('float32 is currently unsupported in certain operations, use float64 instead') def _typecheck(self, annotation): if annotation is not None and not isinstance(annotation, (float, int)): raise TypeError("type 'float32' expected Python 'float', but found type '%s'" % type(annotation)) def __str__(self): return "float32" def _eq(self, other): return isinstance(other, _tfloat32) def _convert_from_json(self, x): return float(x) class _tfloat64(HailType): """Hail type for 64-bit floating point numbers. In Python, these are represented as :obj:`float`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TFloat64Optional') super(_tfloat64, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): if annotation is not None: return Env.jutils().makeDouble(annotation) else: return None def _typecheck(self, annotation): if annotation is not None and not isinstance(annotation, (float, int)): raise TypeError("type 'float64' expected Python 'float', but found type '%s'" % type(annotation)) def __str__(self): return "float64" def _eq(self, other): return isinstance(other, _tfloat64) def _convert_from_json(self, x): return float(x) class _tstr(HailType): """Hail type for text strings. In Python, these are represented as strings. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TStringOptional') super(_tstr, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): return annotation def _typecheck(self, annotation): if annotation and not isinstance(annotation, str): raise TypeError("type 'str' expected Python 'str', but found type '%s'" % type(annotation)) def __str__(self): return "str" def _eq(self, other): return isinstance(other, _tstr) class _tbool(HailType): """Hail type for Boolean (``True`` or ``False``) values. In Python, these are represented as :obj:`bool`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TBooleanOptional') super(_tbool, self).__init__() def _convert_to_py(self, annotation): return annotation def _convert_to_j(self, annotation): return annotation def _typecheck(self, annotation): if annotation is not None and not isinstance(annotation, bool): raise TypeError("type 'bool' expected Python 'bool', but found type '%s'" % type(annotation)) def __str__(self): return "bool" def _eq(self, other): return isinstance(other, _tbool)
[docs]class tarray(HailType): """Hail type for variable-length arrays of elements. In Python, these are represented as :obj:`list`. Notes ----- Arrays contain elements of only one type, which is parameterized by `element_type`. Parameters ---------- element_type : :class:`.HailType` Element type of array. See Also -------- :class:`.ArrayExpression`, :class:`.CollectionExpression`, :func:`.array`, :ref:`sec-collection-functions` """ @typecheck_method(element_type=hail_type) def __init__(self, element_type): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TArray').apply(element_type._jtype, False) self._element_type = element_type super(tarray, self).__init__() @property def element_type(self): """Array element type. Returns ------- :class:`.HailType` Element type. """ return self._element_type def _convert_to_py(self, annotation): if annotation is not None: lst = Env.jutils().iterableToArrayList(annotation) return [self.element_type._convert_to_py(x) for x in lst] else: return None def _convert_to_j(self, annotation): if annotation is not None: return Env.jutils().arrayListToISeq( [self.element_type._convert_to_j(elt) for elt in annotation] ) else: return None def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, list): raise TypeError("type 'array' expected Python 'list', but found type '%s'" % type(annotation)) for elt in annotation: self.element_type._typecheck(elt) def __str__(self): return "array<{}>".format(self.element_type) def _eq(self, other): return isinstance(other, tarray) and self.element_type == other.element_type def _pretty(self, l, indent, increment): l.append('array<') self.element_type._pretty(l, indent, increment) l.append('>') def _convert_from_json(self, x): return [self.element_type._convert_from_json_na(elt) for elt in x] def _convert_to_json(self, x):
return [self.element_type._convert_to_json_na(elt) for elt in x]
[docs]class tset(HailType): """Hail type for collections of distinct elements. In Python, these are represented as :obj:`set`. Notes ----- Sets contain elements of only one type, which is parameterized by `element_type`. Parameters ---------- element_type : :class:`.HailType` Element type of set. See Also -------- :class:`.SetExpression`, :class:`.CollectionExpression`, :func:`.set`, :ref:`sec-collection-functions` """ @typecheck_method(element_type=hail_type) def __init__(self, element_type): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TSet').apply(element_type._jtype, False) self._element_type = element_type super(tset, self).__init__() @property def element_type(self): """Set element type. Returns ------- :class:`.HailType` Element type. """ return self._element_type def _convert_to_py(self, annotation): if annotation is not None: lst = Env.jutils().iterableToArrayList(annotation) return set([self.element_type._convert_to_py(x) for x in lst]) else: return None def _convert_to_j(self, annotation): if annotation is not None: return jset( [self.element_type._convert_to_j(elt) for elt in annotation] ) else: return None def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, set): raise TypeError("type 'set' expected Python 'set', but found type '%s'" % type(annotation)) for elt in annotation: self.element_type._typecheck(elt) def __str__(self): return "set<{}>".format(self.element_type) def _eq(self, other): return isinstance(other, tset) and self.element_type == other.element_type def _pretty(self, l, indent, increment): l.append('set<') self.element_type._pretty(l, indent, increment) l.append('>') def _convert_from_json(self, x): return {self.element_type._convert_from_json_na(elt) for elt in x} def _convert_to_json(self, x):
return [self.element_type._convert_to_json_na(elt) for elt in x]
[docs]class tdict(HailType): """Hail type for key-value maps. In Python, these are represented as :obj:`dict`. Notes ----- Dicts parameterize the type of both their keys and values with `key_type` and `value_type`. Parameters ---------- key_type: :class:`.HailType` Key type. value_type: :class:`.HailType` Value type. See Also -------- :class:`.DictExpression`, :func:`.dict`, :ref:`sec-collection-functions` """ @typecheck_method(key_type=hail_type, value_type=hail_type) def __init__(self, key_type, value_type): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TDict').apply( key_type._jtype, value_type._jtype, False) self._key_type = key_type self._value_type = value_type super(tdict, self).__init__() @property def key_type(self): """Dict key type. Returns ------- :class:`.HailType` Key type. """ return self._key_type @property def value_type(self): """Dict value type. Returns ------- :class:`.HailType` Value type. """ return self._value_type def _convert_to_py(self, annotation): if annotation is not None: lst = Env.jutils().iterableToArrayList(annotation) d = dict() for x in lst: d[self.key_type._convert_to_py(x._1())] = self.value_type._convert_to_py(x._2()) return d else: return None def _convert_to_j(self, annotation): if annotation is not None: return Env.jutils().javaMapToMap( {self.key_type._convert_to_j(k): self.value_type._convert_to_j(v) for k, v in annotation.items()} ) else: return None def _typecheck(self, annotation): if annotation: if not isinstance(annotation, dict): raise TypeError("type 'dict' expected Python 'dict', but found type '%s'" % type(annotation)) for k, v in annotation.items(): self.key_type._typecheck(k) self.value_type._typecheck(v) def __str__(self): return "dict<{}, {}>".format(self.key_type, self.value_type) def _eq(self, other): return isinstance(other, tdict) and self.key_type == other.key_type and self.value_type == other.value_type def _pretty(self, l, indent, increment): l.append('dict<') self.key_type._pretty(l, indent, increment) l.append(', ') self.value_type._pretty(l, indent, increment) l.append('>') def _convert_from_json(self, x): return {self.key_type._convert_from_json_na(elt['key']): self.value_type._convert_from_json_na(elt['value']) for elt in x} def _convert_to_json(self, x): return [{'key': self.key_type._convert_to_json(k),
'value':self.value_type._convert_to_json(v)} for k, v in x.items()]
[docs]class tstruct(HailType, Mapping): """Hail type for structured groups of heterogeneous fields. In Python, these are represented as :class:`.Struct`. Parameters ---------- field_types : keyword args of :class:`.HailType` Fields. See Also -------- :class:`.StructExpression`, :class:`.Struct` """ @typecheck_method(field_types=hail_type) def __init__(self, **field_types): self._field_types = field_types self._fields = tuple(field_types) self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TStruct').apply( list(self._fields), [t._jtype for f, t in self._field_types.items()], False) super(tstruct, self).__init__() @property def fields(self): """Struct fields. Returns ------- :obj:`tuple` of :class:`.Field` Struct fields. """ return self._fields def _convert_to_py(self, annotation): if annotation is not None: d = dict() for i, (f, t) in enumerate(self.items()): d[f] = t._convert_to_py(annotation.get(i)) return Struct(**d) else: return None def _convert_to_j(self, annotation): if annotation is not None: return scala_object(Env.hail().annotations, 'Annotation').fromSeq( Env.jutils().arrayListToISeq( [t._convert_to_j(annotation.get(f)) for f, t in self.items()])) else: return None def _typecheck(self, annotation): if annotation: if isinstance(annotation, Mapping): s = set(self) for f in annotation: if f not in s: raise TypeError("type '%s' expected fields '%s', but found fields '%s'" % (self, list(self), list(annotation))) for f, t in self.items(): t._typecheck(annotation.get(f)) else: raise TypeError("type 'struct' expected type Mapping (e.g. hail.genetics.Struct or dict), but found '%s'" % type(annotation)) @typecheck_method(item=oneof(int, str)) def __getitem__(self, item): if isinstance(item, str): return self._field_types[item] else: self._field_types[self._fields[item]] def __iter__(self): return iter(self._field_types) def __len__(self): return len(self._fields) def __str__(self): return "struct{{{}}}".format( ', '.join('{}: {}'.format(escape_parsable(f), str(t)) for f, t in self.items())) def _eq(self, other): return (isinstance(other, tstruct) and self._fields == other._fields and all(self[f] == other[f] for f in self._fields)) def _pretty(self, l, indent, increment): pre_indent = indent indent += increment l.append('struct {') for i, (f, t) in enumerate(self.items()): if i > 0: l.append(', ') l.append('\n') l.append(' ' * indent) l.append('{}: '.format(escape_parsable(f))) t._pretty(l, indent, increment) l.append('\n') l.append(' ' * pre_indent) l.append('}') def _convert_from_json(self, x): return Struct(**{f: t._convert_from_json_na(x.get(f)) for f, t in self.items()}) def _convert_to_json(self, x):
return {f: t._convert_to_json_na(x[f]) for f, t in self.items()}
[docs]class ttuple(HailType): """Hail type for tuples. In Python, these are represented as :obj:`tuple`. Parameters ---------- types: varargs of :class:`.HailType` Element types. See Also -------- :class:`.TupleExpression` """ @typecheck_method(types=hail_type) def __init__(self, *types): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TTuple').apply(map(lambda t: t._jtype, types), False) self._types = types super(ttuple, self).__init__() @property def types(self): """Tuple element types. Returns ------- :obj:`tuple` of :class:`.HailType` """ return self._types def _convert_to_py(self, annotation): if annotation is not None: return tuple(*(t._convert_to_py(annotation.get(i)) for i, t in enumerate(self.types))) else: return None def _convert_to_j(self, annotation): if annotation is not None: return Env.jutils().arrayListToISeq( [self.types[i]._convert_to_j(elt) for i, elt in enumerate(annotation)] ) else: return None def _typecheck(self, annotation): if annotation: if not isinstance(annotation, tuple): raise TypeError("type 'tuple' expected Python tuple, but found '%s'" % type(annotation)) if len(annotation) != len(self.types): raise TypeError("%s expected tuple of size '%i', but found '%s'" % (self, len(self.types), annotation)) for i, t in enumerate(self.types): t._typecheck((annotation[i])) def __str__(self): return "tuple({})".format(", ".join([str(t) for t in self.types])) def _eq(self, other): from operator import eq return isinstance(other, ttuple) and len(self.types) == len(other.types) and all( map(eq, self.types, other.types)) def _pretty(self, l, indent, increment): pre_indent = indent indent += increment l.append('tuple (') for i, t in enumerate(self.types): if i > 0: l.append(', ') l.append('\n') l.append(' ' * indent) t._pretty(l, indent, increment) l.append('\n') l.append(' ' * pre_indent) l.append(')') def _convert_from_json(self, x): return tuple(self.types[i]._convert_from_json_na(x[i]) for i in range(len(self.types))) def _convert_to_json(self, x):
return [self.types[i]._convert_to_json_na(x[i]) for i in range(len(self.types))] class _tcall(HailType): """Hail type for a diploid genotype. In Python, these are represented by :class:`.Call`. """ def __init__(self): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TCallOptional') super(_tcall, self).__init__() @typecheck_method(annotation=nullable(int)) def _convert_to_py(self, annotation): if annotation is not None: return genetics.Call._from_java(annotation) else: return None @typecheck_method(annotation=nullable(genetics.Call)) def _convert_to_j(self, annotation): if annotation is not None: return annotation._call else: return None def _typecheck(self, annotation): if annotation is not None and not isinstance(annotation, genetics.Call): raise TypeError("type 'call' expected Python hail.genetics.Call, but found %s'" % type(annotation)) def __str__(self): return "call" def _eq(self, other): return isinstance(other, _tcall) def _convert_from_json(self, x): return hl.Call._from_java(hl.Call._call_jobject().parse(x)) def _convert_to_json(self, x): return str(x)
[docs]class tlocus(HailType): """Hail type for a genomic coordinate with a contig and a position. In Python, these are represented by :class:`.Locus`. Parameters ---------- reference_genome: :class:`.ReferenceGenome` or :obj:`str` Reference genome to use. See Also -------- :class:`.LocusExpression`, :func:`.locus`, :func:`.parse_locus`, :class:`.Locus` """ @typecheck_method(reference_genome=reference_genome_type) def __init__(self, reference_genome='default'): self._rg = reference_genome self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TLocus').apply(self._rg._jrep, False) super(tlocus, self).__init__() def _convert_to_py(self, annotation): if annotation is not None: return genetics.Locus._from_java(annotation, self._rg) else: return None def _convert_to_j(self, annotation): if annotation is not None: return annotation._jrep else: return None def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, genetics.Locus): raise TypeError("type '{}' expected Python hail.genetics.Locus, but found '{}'" .format(self, type(annotation))) if not self.reference_genome == annotation.reference_genome: raise TypeError("type '{}' encountered Locus with reference genome {}" .format(self, repr(annotation.reference_genome))) def __str__(self): return "locus<{}>".format(escape_parsable(str(self.reference_genome))) def _eq(self, other): return isinstance(other, tlocus) and self.reference_genome == other.reference_genome @property def reference_genome(self): """Reference genome. Returns ------- :class:`.ReferenceGenome` Reference genome. """ if self._rg is None: self._rg = hl.default_reference() return self._rg def _pretty(self, l, indent, increment): l.append('locus<{}>'.format(escape_parsable(self.reference_genome.name))) def _convert_from_json(self, x): return genetics.Locus(x['contig'], x['position'], reference_genome=self.reference_genome) def _convert_to_json(self, x):
return {'contig': x.contig, 'position': x.position}
[docs]class tinterval(HailType): """Hail type for intervals of ordered values. In Python, these are represented by :class:`.Interval`. Parameters ---------- point_type: :class:`.HailType` Interval point type. See Also -------- :class:`.IntervalExpression`, :class:`.Interval`, :func:`.interval`, :func:`.parse_locus_interval` """ @typecheck_method(point_type=hail_type) def __init__(self, point_type): self._get_jtype = lambda: scala_object(Env.hail().expr.types, 'TInterval').apply(self.point_type._jtype, False) self._point_type = point_type super(tinterval, self).__init__() @property def point_type(self): """Interval point type. Returns ------- :class:`.HailType` Interval point type. """ return self._point_type def _convert_to_py(self, annotation): if annotation is not None: return Interval._from_java(annotation, self._point_type) else: return None def _convert_to_j(self, annotation): if annotation is not None: return annotation._jrep else: return None def _typecheck(self, annotation): if annotation is not None: if not isinstance(annotation, Interval): raise TypeError("type '{}' expected Python hail.utils.Interval, but found {}" .format(self, type(annotation))) if annotation.point_type != self.point_type: raise TypeError("type '{}' encountered Interval with point type {}" .format(self, repr(annotation.point_type))) def __str__(self): return "interval<{}>".format(str(self.point_type)) def _eq(self, other): return isinstance(other, tinterval) and self.point_type == other.point_type def _pretty(self, l, indent, increment): l.append('interval<') self.point_type._pretty(l, indent, increment) l.append('>') def _convert_from_json(self, x): return Interval(self.point_type._convert_from_json_na(x['start']), self.point_type._convert_from_json_na(x['end']), x['includeStart'], x['includeEnd']) def _convert_to_json(self, x): return {'start': self.point_type._convert_to_json_na(x.start), 'end': self.point_type._convert_to_json_na(x.end), 'includeStart': x.includes_start,
'includeEnd': x.includes_end} tint32 = _tint32() """Hail type for signed 32-bit integers. Their values can range from :math:`-2^{31}` to :math:`2^{31} - 1` (approximately 2.15 billion). In Python, these are represented as :obj:`int`. See Also -------- :class:`.Int32Expression`, :func:`.int`, :func:`.int32` """ tint64 = _tint64() """Hail type for signed 64-bit integers. Their values can range from :math:`-2^{63}` to :math:`2^{63} - 1`. In Python, these are represented as :obj:`int`. See Also -------- :class:`.Int64Expression`, :func:`.int64` """ tint = tint32 """Alias for :py:data:`.tint32`.""" tfloat32 = _tfloat32() """Hail type for 32-bit floating point numbers. In Python, these are represented as :obj:`float`. See Also -------- :class:`.Float32Expression`, :func:`.float64` """ tfloat64 = _tfloat64() """Hail type for 64-bit floating point numbers. In Python, these are represented as :obj:`float`. See Also -------- :class:`.Float64Expression`, :func:`.float`, :func:`.float64` """ tfloat = tfloat64 """Alias for :py:data:`.tfloat64`.""" tstr = _tstr() """Hail type for text strings. In Python, these are represented as strings. See Also -------- :class:`.StringExpression`, :func:`.str` """ tbool = _tbool() """Hail type for Boolean (``True`` or ``False``) values. In Python, these are represented as :obj:`bool`. See Also -------- :class:`.BooleanExpression`, :func:`.bool` """ tcall = _tcall() """Hail type for a diploid genotype. In Python, these are represented by :class:`.Call`. See Also -------- :class:`.CallExpression`, :class:`.Call`, :func:`.call`, :func:`.parse_call`, :func:`.unphased_diploid_gt_index_call` """ hts_entry_schema = tstruct(GT=tcall, AD=tarray(tint32), DP=tint32, GQ=tint32, PL=tarray(tint32)) _numeric_types = {tbool, tint32, tint64, tfloat32, tfloat64} _primitive_types = _numeric_types.union({tstr}) @typecheck(t=HailType) def is_numeric(t) -> bool: return t in _numeric_types @typecheck(t=HailType) def is_primitive(t) -> bool: return t in _primitive_types @typecheck(t=HailType) def is_container(t) -> bool: return (isinstance(t, tarray) or isinstance(t, tset) or isinstance(t, tdict)) @typecheck(t=HailType) def is_compound(t) -> bool: return (is_container(t) or isinstance(t, tstruct) or isinstance(t, ttuple)) def types_match(left, right) -> bool: return (len(left) == len(right) and all(map(lambda lr: lr[0].dtype == lr[1].dtype, zip(left, right)))) import pprint _old_printer = pprint.PrettyPrinter class TypePrettyPrinter(pprint.PrettyPrinter): def _format(self, object, stream, indent, allowance, context, level): if isinstance(object, HailType): stream.write(object.pretty(self._indent_per_level)) else: return _old_printer._format(self, object, stream, indent, allowance, context, level) pprint.PrettyPrinter = TypePrettyPrinter # monkey-patch pprint