Scaling TQL
TQL is designed to be scalable out of the box, via three main approaches: executing queries on a cluster with Spark, representative downsampling, and pre-computation with named variables.
from zeenk.tql import *
------------------------------------------------------------------
Version: 20.1.17-SNAPSHOT
Version Timestamp: 2021-08-06 21:43:03
Version Age: 4 days, 16 hours, 5 minutes, 56 seconds
Filesystem Root: /Users/zkozick/.noumena/files
Working Directory: /Users/zkozick/.noumena
Configuration File: /Users/zkozick/.noumena/noumena_conf.yml
Api Gateway: http://localhost:9000
Service Status: Icarus: ONLINE, Daedalus: ONLINE
Service Uptime: 1 days, 50 minutes, 8 seconds
------------------------------------------------------------------
TQL on Spark
Submitting Queries to a Spark Cluster
TQL queries are by default executed on a sample of 1000 timelines inside the API server to ensure
interactive response times. TQL also ships with a distributed execution engine, built on top of Apache
Spark. In order to use Spark to dispatch your query, simply provide the kwarg spark=True
when you
call submit:
resultset = select(
'bid.ad_size',
'bid.bid'
).from_events('lethe4')\
.limit(10)\
.submit(spark=True, wait=True)
Click to Abort Job
NOTE: Abort link will open the result of the call in a new tab/window
SUCCESS: 1/1|██████████|, TOTAL ELAPSED=0:00:09, WAIT=0:00:00, PENDING=0:00:01, RUNNING=0:00:07
Spark Resultsets
TQL Queries submitted to Spark also return Resultset objects, but the difference is that each partition in the resultset will have a path to the data on local disk or a distributed file system, instead of the data itself:
resultset.partition('_default').data_path()
'file:/Users/zkozick/.noumena/files/datasets/2021-08-11/118428900272667/data/partition=_default'
Just like interactive resultsets, the partition data can be retrieved using the dataframe() functions:
df = resultset.dataframe() # a python list-of-lists.
df = resultset.spark_dataframe() # a spark dataframe
df = resultset.pandas_dataframe() # a pandas dataframe
The only difference is that for Spark resultsets, the data will be loaded from disk.
Retrieving Asynchronous Queries and ResultSets
Another difference is that because TQL queries executed on Spark can potentially be long-running operations, they are assigned a unique ID upon submission, that can be retrieved at any point in the future by another script or notebook run on the same machine. Get the id of the Resultset by calling get_id()
:
resultset.get_id()
459
You can then use this ID to retrieve the Query with the TQL function load_query(id)
, or the Resultset with load_resultset(id)
Connecting to an external Spark Cluster
By default, TQL uses an embedded, in-memory spark cluster with master point of local[*]
. If your TQL
workloads exceed the capacity of a local, embedded cluster, you should configure TQL to connect to a
running spark cluster with a master point of yarn
or mesos
. In order to do this, add a custom
configuration file and change the spark master to point to your desired cluster. For example:
pyspark:
spark.master: yarn
For more info on customizing TQL’s use of Spark, refer to configuring PySpark.
Reading and Writing To Cloud Storage
TQL supports reading and writing files to a cloud storage system such as Amazon S3 or Google Cloud
Storage. If Noumena is installed on a cloud VM and that VM already has the necessary permissions, no
further configuration is required. Read more about configuring AWS IAM Roles
here and GCP IAM Roles
here. If Noumena is installed on
a machine outside of an IAM-managed environment, such as your local laptop, to read/write cloud storage
files you will need to provide your authentication credentials in the Noumena configuration file,
located (by default) at ~/.noumena/noumena_conf.yml
. Please refer to
configuring a filesystem for Noumena.
Downsampling ResultSets
Simple Downsampling
A common strategy for creating more performant machine learning pipelines is to downsample your data.
A simple way to downsample is to introduce a where()
clause with a random draw:
# use random() to only retain 20% of the data
select(
event_metadata()
)\
.from_events('lethe4')\
.where('random() < 0.2')\
.limit(10)
Query results:
partition "_default"_timeline_id | _event_id | _datetime | _type | |
---|---|---|---|---|
0 | u252 | 51539616375 | 2021-01-02 00:02:36 | bid |
1 | u252 | 51539616624 | 2021-01-02 00:30:55 | bid |
2 | u252 | 51539609120 | 2021-01-02 03:36:02 | activity |
3 | u252 | 51539618517 | 2021-01-02 05:54:20 | bid |
4 | u252 | 51539618808 | 2021-01-02 07:58:05 | bid |
5 | u252 | 51539622579 | 2021-01-02 18:32:04 | bid |
6 | u252 | 51539623640 | 2021-01-02 20:36:42 | bid |
7 | u252 | 8589935963 | 2021-01-04 17:31:33 | activity |
8 | u252 | 8589936254 | 2021-01-04 19:22:22 | activity |
9 | u252 | 8589936373 | 2021-01-04 20:22:25 | activity |
Downsampling By Attribute
Often you would prefer to downsample based on a known key, such as the user id. This way, you will retain all of the events for a user in your ResultSets:
select(
event_metadata()
)\
.from_events('lethe4')\
.where('MD5_MOD(timeline.id, 10) < 2')\
.limit(10)
Query results:
partition "_default"_timeline_id | _event_id | _datetime | _type | |
---|---|---|---|---|
0 | u252 | 252 | 2021-01-01 00:00:00 | user |
1 | u252 | 51539616375 | 2021-01-02 00:02:36 | bid |
2 | u252 | 51539616491 | 2021-01-02 00:16:41 | bid |
3 | u252 | 51539616624 | 2021-01-02 00:30:55 | bid |
4 | u252 | 51539608944 | 2021-01-02 00:57:47 | activity |
5 | u252 | 51539616852 | 2021-01-02 01:04:01 | bid |
6 | u252 | 51539617100 | 2021-01-02 01:37:26 | bid |
7 | u252 | 51539609033 | 2021-01-02 02:15:43 | activity |
8 | u252 | 51539617425 | 2021-01-02 02:18:30 | bid |
9 | u252 | 51539617722 | 2021-01-02 03:01:48 | bid |
Using the downsample_by() operator
TODO
Using the timeline_sample_rate() operator
TODO
Optimizing Computation with Scoped Variables
TQL allows you to pre-compute expensive variables, and reference them in your features to reduce computation time. For example, perhaps you have a feature that requires you to find the number of bids on each user’s timeline:
SIZE(FILTER(timeline.events, (x) -> x.type == "bid"))
In this case, we precompute this variable and store it in a TIMELINE_VAR(name, expr)
expression.
We can then reference it in the query’s expressions:
select(
'TIMELINE_VAR("userBidCnt")',
)\
.from_events('lethe4')\
.timeline_var('userBidCnt', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid"))')\
.limit(3)
Query results:
partition "_default"_c0 | |
---|---|
0 | 431.0 |
1 | 431.0 |
2 | 431.0 |
If we have 1000 timelines in our project, each with 20 events and our query has 3 features that use the
bid count, pre-computing this value with a TIMELINE_VAR()
expression will speed up computation of these
features by 60x.
Similar costructs exist for global, non-timeline specific variables with GLOBAL_VAR()
, and
event-specific variables with EVENT_VAR()
. Extending upon the previous example, lets use EVENT_VAR()
to pre-compute the number of bids that occurred before a purchase event:
select(
'type',
'activity.conversion',
'EVENT_VAR("bidsBeforePurchase")',
)\
.from_events('lethe4')\
.where('type == "activity" and activity.conversion == "purchase"')\
.event_var('bidsBeforePurchase', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid" AND x.timestamp < timestamp))')\
.limit(3)
Query results:
partition "_default"_c0 | _c1 | _c2 | |
---|---|---|---|
0 | activity | purchase | 46.0 |
1 | activity | purchase | 55.0 |
2 | activity | purchase | 64.0 |