Writing Queries

The primary object of TQL is to use the Query API to extract machine-learning ready ResultSets from Timeline data. Each row in a ResultSet is an event on a timeline, and each column is a piece of data extracted from the event using the TQL Expression Language.

Query Syntax

Similar to SQL, every query starts with a select statement, which contains one or more TQL column expressions, a from clause which designates which set of timelines to select from, and optional operators that modify the output, such as where() or limit():

Each subsequent call to a Query function modifies the state of the Query itself, and also returns itself for subsequent chaining together of operators, in the fluent style of programming.

Once the Query has been built up to the desired state, the submit() function is called, which returns a TQL ResultSet containing the data or pointer to the data, as well as statistics about the query execution.

In the following example, we select the constant 1 from timelines of the demo project lethe4:

from zeenk.tql import *

# select emits a Query and submit emits a ResultSet
query = select(1).from_events('lethe4').limit(1)
resultset = query.submit()
print(type(query))
print(type(resultset))
<class 'zeenk.tql.query.Query'>
<class 'zeenk.tql.resultset.ResultSet'>

Query Columns

Each string passed to select() is actually a TQL expression, which describes how to translate timeline events to pieces of derived data. In the following example, we select the string literals “hello” and “world”. Since the selected data is constant, this query is going to return the same value for every row:

select("'hello'", "'world'")\
.from_events('lethe4')\
.limit(2) # 'hello' and 'world', and runs it against the first 2 events of the project and avoids running against the full ~200,000 rows

Query results:

partition "_default"
_c0 _c1
0 hello world
1 hello world
query produced 2 rows x 2 columns in 0.45 seconds

Expression Types

Expressions have several types: integer, long, float, string, null, boolean, array, map

select(
  123467890, 
  1.0000001, 
  "'string'", 
  None, 
  True, 
  '[1, 2, 3]', 
  "{'key':'value'}"
).from_events('lethe4').limit(2)

Query results:

partition "_default"
_c0 _c1 _c2 _c3 _c4 _c5 _c6
0 123467890 1.0000001 string None true [1,2,3] {key=value}
1 123467890 1.0000001 string None true [1,2,3] {key=value}
query produced 2 rows x 7 columns in 0.53 seconds

Selecting Event Attributes

Expressions are applied to every event. Each row corresponds to an event on the timeline with different types if you have a heterogeneous dataset.

timeline.id and type are universal variables that exist in all projects.

There are special variables like activity.* that belongs to a specific project. For example, activity.conversion and bid.ad_size are specific to the timeline data and may not show up in every row.

select(
  'timeline.id',
  'timestamp',
  'type',
  'activity.conversion',
  'bid.ad_size',
  'user.gender'
).from_events('lethe4').limit(3)

Query results:

partition "_default"
_c0 _c1 _c2 _c3 _c4 _c5
0 u252 1609459200000 user None None male
1 u252 1609545756000 bid None big None
2 u252 1609546601000 bid None big None
query produced 3 rows x 6 columns in 0.61 seconds

The Select Wildcard Operator

The wildcard operator selects all event attributes.

select('*').from_events('lethe4').limit(2)

Query results:

partition "_default"
activity_activity_id activity_conversion activity_event_time activity_user_id bid_ad_size bid_bid bid_event_time bid_ghosted bid_request_id bid_user_id ... outcome timeline_id timestamp type user_age user_gender user_group user_region user_timestamp user_user_id
0 None None None None None None None None None None ... None u252 1609459200000 user 55plus male influenced south 2021-01-01 u252
1 None None None None big 1.0 2021-01-02 00:02:36 false r13260 u252 ... None u252 1609545756000 bid None None None None None None

2 rows × 26 columns

query produced 2 rows x 26 columns in 0.84 seconds

The wildcard operator can also select a subset of event attributes.

select('bid.*').from_events('lethe4').limit(3)

Query results:

partition "_default"
bid_ad_size bid_bid bid_event_time bid_ghosted bid_request_id bid_user_id bid_won
0 None None None None None None None
1 big 1.0 2021-01-02 00:02:36 false r13260 u252 true
2 big 1.0 2021-01-02 00:16:41 false r13376 u252 true
query produced 3 rows x 7 columns in 0.61 seconds

The Validation Operator

Using validate() is a quick way to check if your query is valid. In the expression below, the expression validator is expecting to see a value assigned to a.

select('a=').from_events('lethe4').validate()
Encountered 1 expression compilation error: Line 1:2 no viable alternative at input 'a=' (in column _c0)
Expression error: Line 1:2 no viable alternative at input 'a=' (in column _c0)
a=
--^




---------------------------------------------------------------------------

TQLAnalysisException                      Traceback (most recent call last)

<ipython-input-8-c1a43c6aef08> in <module>
----> 1 select('a=').from_events('lethe4').validate()


~/src/noumena/python/zeenk-tql/zeenk/tql/query.py in validate(self)
    854         payload = self.json()
    855         if payload != self._last_validated_spec:
--> 856             icarus.post('query/validate', json_data=payload)
    857             self._last_validated_spec = payload
    858 


~/src/noumena/python/zeenk-tql/zeenk/tql/icarus.py in post(url, json_data, files_data, data, print_json)
     14         print(json.dumps(json_data))
     15     resp = requests.post(url, data=data, json=json_data, files=files_data)
---> 16     return _handle_response(url, resp, print_json=print_json)
     17 
     18 


~/src/noumena/python/zeenk-tql/zeenk/tql/icarus.py in _handle_response(url, resp, print_json)
     70             else:
     71                 print(reason)
---> 72             raise TQLAnalysisException(reason)
     73         else:
     74             msg = ''


TQLAnalysisException: Encountered 1 expression compilation error: Line 1:2 no viable alternative at input 'a=' (in column _c0)Expression error: Line 1:2 no viable alternative at input 'a=' (in column _c0)
a=
--^

The Timeline Limit Operator

Here we are limiting the number of timelines that we run against. In the following query, we select a maximum of 2 events from a maximum of 1 timelines.

select('timeline.id').from_events('lethe4').limit(rows=2, timelines=1)

Query results:

partition "_default"
_c0
0 u252
1 u252
query produced 2 rows x 1 columns in 0.36 seconds

The Where Operator

The where operator only emits rows where the given expression evaluated to true. It takes a truthy expression. Anything non-zero, non-empty, and non-null equaluate to true. The example below selects events that are of type activity and that activity was a purchase.

select(
  'type',
  'activity.conversion'
).from_events('lethe4')\
.where('type == "activity" and activity.conversion == "purchase"')\
.limit(2)

Query results:

partition "_default"
_c0 _c1
0 activity purchase
1 activity purchase
query produced 2 rows x 2 columns in 0.43 seconds

The From Timelines Operator

The from_timelines() operator, is roughly equal to a “group by user/timeline_id” operation. The ResultSet of from_timelines() has exactly one row per timeline. With the from_timelines() operator, only timeline-level attributes can be selected. Operations that do aggregations of the timeline.events are useful, such as SIZE(timeline.events) or other more complex transformations that would occur on a on-per-timeline level. More details are provided in the Python API Doc.

select(
  # the timeline id
  'timeline.id',
  
  # the number of events on the timeline
  'SIZE(timeline.events)',
  
  # what was the timestamp of the first bid we made for every user?
  'events = FILTER(timeline.events, (x) -> x.bid); events[1]',

).from_timelines('lethe4')\
.limit(3)

Query results:

partition "_default"
_c0 _c1 _c2
0 u252 549.0 {"timestamp":1609546601000,"id":51539616491,"type":"bid","activity":null,"bid":{"bid":1.0,"ghosted":false,"won":true,"request_id":"r13376","ad_size":"big","event_time":"2021-01-02 00:16:41","user_id":"u252"},"user":null,"generated":null,"index":null,"duration":null,"generated_weight":null}
1 u577 802.0 {"timestamp":1609461055000,"id":25769817455,"type":"bid","activity":null,"bid":{"bid":1.0,"ghosted":true,"won":false,"request_id":"r267","ad_size":"big","event_time":"2021-01-01 00:30:55","user_id":"u577"},"user":null,"generated":null,"index":null,"duration":null,"generated_weight":null}
2 u582 461.0 {"timestamp":1609460058000,"id":25769817308,"type":"bid","activity":null,"bid":{"bid":1.0,"ghosted":false,"won":true,"request_id":"r120","ad_size":"small","event_time":"2021-01-01 00:14:18","user_id":"u582"},"user":null,"generated":null,"index":null,"duration":null,"generated_weight":null}
query produced 3 rows x 3 columns in 0.51 seconds

The Submit Operator

TQL Queries submitted to Spark also return ResultSet objects, but the difference is that each partition in the resultset will have a path to the data on local disk or a distributed file system, instead of the data itself. To read the data, see the load section of the ResultSet Guide.

select(
  'type',
  'timestamp'
).from_events('lethe4')\
.limit(4)\
.submit(spark=True) # we can submit jobs to a Spark cluster where the data is processed by worker nodes. 

Click to Abort Job

NOTE: Abort link will open the result of the call in a new tab/window

SUCCESS:  8/8|██████████|, TOTAL ELAPSED=0:00:17, WAIT=0:00:00, PENDING=0:00:00, RUNNING=0:00:16

Query results:

partition "_default"
key value
0 name _default
1 data_path file:/Users/smaguire/.tql/files/datasets/2021-08-30/37066623867876/data/partition=_default
2 row_count 32
3 columns [_c0, _c1]
4 positive_row_count 0
query produced 4 rows x 2 columns in 17.92 seconds

The Format Operator for Spark ResultSets

Using format() will output CSV, JSON, or Parquet

select(
  'type',
  'timestamp',
  'activity.activity_id', 
  'activity.conversion'
).from_events('lethe4')\
.format('json')\
.limit(5)\
.submit(spark=True)

Click to Abort Job

NOTE: Abort link will open the result of the call in a new tab/window

SUCCESS:  8/8|██████████|, TOTAL ELAPSED=0:00:06, WAIT=0:00:00, PENDING=0:00:00, RUNNING=0:00:05

Query results:

partition "_default"
key value
0 name _default
1 data_path file:/Users/smaguire/.tql/files/datasets/2021-08-30/37093997083474/data/partition=_default
2 row_count 40
3 columns [_c0, _c1, _c2, _c3]
4 positive_row_count 0
query produced 5 rows x 4 columns in 7.3 seconds

Column

Columns have types. You must provide an expression, such as RANDOM(), 2.0, or a='5'. You can optionally provide names or cast data by assigning the column a machine learning type.

select(
  # A labeled column
  label('RANDOM()', 'my_label'),

  # A weight column is for a ML resultset. Values must be numeric.
  weight(2.0, name='my_weight_col'),

  # A tag column is a unique identifier for the row. Not required but highly encouraged.
  tag("'tag'", name='my_tag_col'),

  # A categorical feature
  categorical('type', name='my_categorical_col'),

  # A numerical feature. Can be a mixed numerical separated by ':'
  # ex: foo:1.3
  numerical("'foo:1.3'", name='my_numerical_col'),

  # A metadata column.
  metadata("'metadata'", name='my_metadata_col'),

  # Universal casting operation from string, dict, tuple, etc to column.
  # defaults to metadata if type is not provided
  col('YEAR(MILLIS(TO_DATETIME("2020-05-19 03:17:02.000")))', name='my_date_col', type='label')
)\
.from_events('lethe4').limit(3)

Query results:

partition "_default"
my_label my_weight_col my_tag_col my_categorical_col my_numerical_col my_metadata_col my_date_col
0 2020.0 2.0 tag user foo:1.300000000000000 metadata 2020.0
1 2020.0 2.0 tag bid foo:1.300000000000000 metadata 2020.0
2 2020.0 2.0 tag bid foo:1.300000000000000 metadata 2020.0
query produced 3 rows x 7 columns in 0.37 seconds

FeatureColumn

When a column is casted to a categorical() or numerical() feature, the following additional behavior is applied:

  • query option global_min_total_count can be applied

  • query option apply_charset_filter can be applied

  • query option drop_empty_rows can be applied

  • query option expand_numerical_feature can be applied

  • query option drop_numerical_zero_features can be applied

  • column filters can be configured on the column

  • column value metrics are computed on the column

See the Query Options guide for more information on Query Options.

print(type(numerical('1')))
print(type(categorical('type')))
<class 'zeenk.tql.column.FeatureColumn'>
<class 'zeenk.tql.column.FeatureColumn'>

Filtering on Columns

exposition about likeness to where() possibly introduce FeatureColumn above

Filtering gives the ability to conditionally null-out a feature. If there is a feature that has a small amount of non-null values you can use a filter like min_total_count to set a threshold on the minimum number of non-null rows. The min_total_count filter only applies to categorical and numerical features.

select(
  # min_total_count: must have at least non-null values for 3 rows or column will be nulled out for all rows
  categorical('bid.ad_size').filter(min_total_count=3)
)\
.from_events('lethe4')\
.where("type == 'bid'")\
.limit(3)

Query results:

partition "_default"
_c0
0 big
1 big
2 small
query produced 3 rows x 1 columns in 0.4 seconds

Using event_metadata() and event_time()

These functions define sets of default columns. Notice that they provide both a column name and an expression that contains a variable.

event_metadata()
(Column<name="_timeline_id", expression="timeline.id">,
 Column<name="_event_id", expression="id">,
 Column<name="_datetime", expression="datetime(timestamp)">,
 Column<name="_type", expression="type">)
event_time()
(Column<name="_timestamp", expression="timestamp">,
 Column<name="_duration", expression="duration">)
select(event_metadata(), event_time()) \
  .from_events('lethe4') \
  .limit(3)

Query results:

partition "_default"
_timeline_id _event_id _datetime _type _timestamp _duration
0 u252 252 2021-01-01 00:00:00 user 1609459200000 None
1 u252 51539616375 2021-01-02 00:02:36 bid 1609545756000 None
2 u252 51539616491 2021-01-02 00:16:41 bid 1609546601000 None
query produced 3 rows x 6 columns in 0.4 seconds

The Partition By Operator

The partition_by() operator splits the data into n partitions. It takes an expression that emits an enumerated value which assigns the row to the partition. In addition to simple operations like randomly assigning rows to a partition, you can also use an attribute of the event to more intelligently assign data to partitions.

In all previous examples, the _default partition was selected. We will now use partition_by to create named partitions. The following will partition the data such that only 80% makes it into the train partition and 20% makes it into the test partition.

select(
  'type',
  'timestamp'
).from_events('lethe4')\
.partition_by('IF(RANDOM() < 0.8, "train", "test")')\
.limit(5)

Query results:

partition "train"
_c0 _c1
0 user 1609459200000
1 bid 1609545756000
2 bid 1609546601000
3 bid 1609547455000
4 activity 1609549067000
query produced 5 rows x 2 columns in 0.42 seconds

# ex: put events of each ad_size into their own partition
select(
  col('type', 'type'),
  col('bid.ad_size', 'size')
).from_events('lethe4')\
.where("type=='bid'")\
.partition_by('IF(bid.ad_size == "small", "small", "big") ')\
.limit(7)

Query results:

partition "small"
type size
0 bid small
1 bid small
partition "big"
type size
0 bid big
1 bid big
2 bid big
3 bid big
4 bid big
query produced 7 rows x 2 columns in 0.41 seconds

The External Timelines Operator

The external_timelines() operator can be used to run your query against custom timeline data you provide, instead of against the regular project timelines. Each timeline is a dictionary with two keys, id (string) and events (list), where each event is a dictionary with fields that match your Project’s schema. For example:

timeline = {
  'id':'my_custom_timeline_id',
  'events': [{
      'bid': {
        'ad_size': 'absolutely_enormous'
      }
    }
  ]
}
select(
  'bid.ad_size'
)\
.from_events('lethe4')\
.external_timelines([timeline]) # This is an array. Multiple timelines can be provided here.

Query results:

partition "_default"
column_name result
0 _c0 absolutely_enormous
query produced 1 rows x 1 columns in 0.37 seconds

The Downsample By Operator

The downsample_by() operator allows you to conditionally downstample a resultset depending on the parameters. For more information see the API reference on downsampling.

# Downsampling the data such that we have twice as many negative records as positive records
select(
  label("IF(type == 'bid', 0, 1)")
).from_events('lethe4')\
.limit(3)\
.downsample_by(neg_pos_ratio=2, reweight=False)

Query results:

partition "_default"
_label
0 1.0
1 0.0
2 0.0
query produced 3 rows x 1 columns in 0.5 seconds

The Options Operator

More details about the options operator can be found in the Options Guide.

select(
  numerical('SUM_OPPORTUNITIES("c")')
).from_events('lethe4')\
.options(
  max_columns=2, # normally there are 7 kernels in the result. remove this line to see
  expand_numerical_features=True
).limit(3)

Query results:

partition "_default"
_c0_EQ_c-3d-e _c0_EQ_c-7d-e
0 1.0 1.0
1 1.0 1.0
2 1.0 1.0
query produced 3 rows x 2 columns in 0.4 seconds

The *_var Operators

These allow you to define scoped variables that can be referenced 1 or more times in your columns. They can be reused by using timeline_var, etc. in TQL expressions.

There are 3 var operators:

  • event_var (once per event)

  • timeline_var (once per timeline)

  • global_var (run once per ResultSet)

Below, a timeline variable first_bid filters the events to type bid, therefore the select runs against only events that are of type bid.

# compute and memoize this expensive operation (avoid going through all events to extract first bid every time the query is run)
select(
  'TIMELINE_VAR("first_bid").type',  # retrieve precomputed timeline variable 'first_bid'
  'TIMELINE_VAR("first_bid").bid.request_id',
)\
.from_events("lethe4")\
.timeline_var('first_bid', 'FILTER(timeline.events, (x) -> x.type == "bid")[0]')\
.limit(3)

Query results:

partition "_default"
_c0 _c1
0 bid r13260
1 bid r13260
2 bid r13260
query produced 3 rows x 2 columns in 0.38 seconds

The Opportunities Operator

Compare the results below. The default kernels are 5 minutes, 15 minutes, 1 hour, 4 hours, 1 day, 3 days, and 7 days (1 week).

In the first example, SUM_OPPORTUNITIES aggregates bid.ad_size via a time-difference weighted categorical-grouped sum where the type is bid. It calculates the hypothesized value of that bid (opportunity) over the kernel (time).

In the second example, we do the same, only, where the type is activity and we set a distribution of exponential decay to evaluate the “opportunity to treat” given the size of the ad.

# no opportunities() operator
select(
  event_metadata(),
  'bid.ad_size',
  'SUM_OPPORTUNITIES(bid.ad_size)'
).from_events('lethe4')\
.where('type == "bid"')\
.limit(3)

Query results:

partition "_default"
_timeline_id _event_id _datetime _type _c4 _c5
0 u252 51539616375 2021-01-02 00:02:36 bid big [big-5m-e:1,big-15m-e:1,big-1h-e:1,big-4h-e:1,big-1d-e:1,big-3d-e:1,big-7d-e:1]
1 u252 51539616491 2021-01-02 00:16:41 bid big [big-5m-e:1,big-15m-e:1,big-1h-e:1,big-4h-e:1,big-1d-e:1,big-3d-e:1,big-7d-e:1]
2 u252 51539616624 2021-01-02 00:30:55 bid small [small-5m-e:1,small-15m-e:1,small-1h-e:1,small-4h-e:1,small-1d-e:1,small-3d-e:1,small-7d-e:1]
query produced 3 rows x 6 columns in 0.41 seconds

# with opportunities() operator
select(
  event_metadata(),
  'bid.ad_size',
  'SUM_OPPORTUNITIES(bid.ad_size)'
).from_events('lethe4')\
.where('type == "activity"')\
.opportunities('type == "bid"', distribution='exp')\
.limit(3)

# for the given outcome (activity) there were some number of opportunities that preceded it. 
# sum_opportunities iterates over each oppr and looks at the delta 
# 

Query results:

partition "_default"
_timeline_id _event_id _datetime _type _c4 _c5
0 u252 51539608944 2021-01-02 00:57:47 activity None [small-1d-e:0.9815155646112473,small-4h-e:5.364563568032985,big-3d-e:0.6592795151474853,small-1h-e:15.33712028705239,small-5m-e:1.335929289646685,big-5m-e:0.08217085284903825,big-1h-e:21.66527965846466,big-7d-e:0.2843530487444983,big-1d-e:1.934265027978346,big-4h-e:9.823198048668358,big-15m-e:8.622850831823627,small-3d-e:0.3312667209981967,small-15m-e:16.01037709354400,small-7d-e:0.1424768864228655]
1 u252 51539609033 2021-01-02 02:15:43 activity None [small-1d-e:1.881235048884746,small-4h-e:8.327589681452498,big-3d-e:0.9778850917217536,small-1h-e:11.44939465428303,small-5m-e:0.0001706822397623980,big-5m-e:0.1361897575453083,big-1h-e:18.59069078435085,big-7d-e:0.4244786628919324,big-1d-e:2.806128853481813,big-4h-e:12.21484985754475,big-15m-e:7.526981406797988,small-3d-e:0.6531907947592189,small-15m-e:0.8947315945806531,small-7d-e:0.2832241682457595]
2 u252 51539609120 2021-01-02 03:36:02 activity None [small-1d-e:1.779180809669550,small-4h-e:5.959110843185616,big-3d-e:1.950292695357807,small-1h-e:3.002141144325396,big-5m-e:20.72168280223276,big-1h-e:44.28131765794558,big-7d-e:0.8479149361292611,big-1d-e:5.568841224025949,big-4h-e:23.96490852181065,big-15m-e:50.11016424082054,small-3d-e:0.6411589816600961,small-15m-e:0.004229481186244135,small-7d-e:0.2809764266104696]
query produced 3 rows x 6 columns in 0.49 seconds

This feature can be used to write validation test cases for your data, or to build a real time scoring service, if used in conjunction with the PREDICT() function.

The Union or Sampling Operator

The union or sampling operator injects new events into each timeline only for the scope of the current query. The generation of these events is described in the generate_events function which consists of one or more timestamps and the option of specifying attributes on the generated events.

The sampling operator is a powerful construct that can construct panel datasets that would be difficult to create with SQL. An example of a query that is hard in SQL but easy in TQL: take a daily sample, count the number of events on the timeline, and count number of events in the previous day.

Below, events are generated at epochs 0ms and 1000ms. The union of the project’s events and those generated events is selected. A new type of conversion simulated_purchase as been injected into the selection. It shows alongside the timeline’s real conversions. Generated events are of type generated_outcome by default.

select(
  'timeline.id',
  'type',
  'timestamp',
  'activity.activity_id', 
  'activity.conversion'
).from_events('lethe4')\
.union(generate_events('[0, 1000]', 
                       attribute_expressions={
                         'activity.conversion': '"simulated_purchase"',
                       }))\
.limit(5)

Query results:

partition "_default"
_c0 _c1 _c2 _c3 _c4
0 u252 generated_outcome 0 None simulated_purchase
1 u252 generated_outcome 1000 None simulated_purchase
2 u252 user 1609459200000 None None
3 u252 bid 1609545756000 None None
4 u252 bid 1609546601000 None None
query produced 5 rows x 5 columns in 0.41 seconds

select(
  'timeline.id',
  'type',
  'timestamp',
  'activity.activity_id', 
  'activity.conversion'
).from_events('lethe4')\
.union(generate_events('[0, 1000]', 
                       attribute_expressions={
                         'activity.conversion': '"simulated_purchase"',
                       }))\
.limit(5)

Query results:

partition "_default"
_c0 _c1 _c2 _c3 _c4
0 u252 generated_outcome 0 None simulated_purchase
1 u252 generated_outcome 1000 None simulated_purchase
2 u252 user 1609459200000 None None
3 u252 bid 1609545756000 None None
4 u252 bid 1609546601000 None None
query produced 5 rows x 5 columns in 0.41 seconds

User-Defined Functions

More complex functions can be built on top of the expression language. User-defined functions are a superset of the expression language.

A simple example is provided below. For a more complete guide about User-Defined Functions refer to the User-Defined Functions Guide.

div2_func_src = '''
/**
* given an integer, long, or floating point number, return the value / 2
* @param x Number
* @return Double
*/

function div2(x) { 
  x / 2 
}
'''
select(
  'div2(4)'
).from_events('lethe4')\
.udf(div2_func_src)\
.limit(1)

Query results:

partition "_default"
column_name result
0 _c0 2.0
query produced 1 rows x 1 columns in 0.39 seconds