Scaling TQL

TQL is designed to be scalable out of the box, via three main approaches: executing queries on a cluster with Spark, representative downsampling, and pre-computation with named variables.

from zeenk.tql import *
------------------------------------------------------------------
           Version: 20.1.17-SNAPSHOT
 Version Timestamp: 2021-08-06 21:43:03
       Version Age: 4 days, 16 hours, 5 minutes, 56 seconds
   Filesystem Root: /Users/zkozick/.noumena/files
 Working Directory: /Users/zkozick/.noumena
Configuration File: /Users/zkozick/.noumena/noumena_conf.yml
       Api Gateway: http://localhost:9000
    Service Status: Icarus: ONLINE, Daedalus: ONLINE
    Service Uptime: 1 days, 50 minutes, 8 seconds
------------------------------------------------------------------

TQL on Spark

Submitting Queries to a Spark Cluster

TQL queries are by default executed on a sample of 1000 timelines inside the API server to ensure interactive response times. TQL also ships with a distributed execution engine, built on top of Apache Spark. In order to use Spark to dispatch your query, simply provide the kwarg spark=True when you call submit:

resultset = select(
  'bid.ad_size', 
  'bid.bid'
).from_events('lethe4')\
.limit(10)\
.submit(spark=True, wait=True)

Click to Abort Job

NOTE: Abort link will open the result of the call in a new tab/window

SUCCESS:  1/1|██████████|, TOTAL ELAPSED=0:00:09, WAIT=0:00:00, PENDING=0:00:01, RUNNING=0:00:07

Spark Resultsets

TQL Queries submitted to Spark also return Resultset objects, but the difference is that each partition in the resultset will have a path to the data on local disk or a distributed file system, instead of the data itself:

resultset.partition('_default').data_path()
'file:/Users/zkozick/.noumena/files/datasets/2021-08-11/118428900272667/data/partition=_default'

Just like interactive resultsets, the partition data can be retrieved using the dataframe() functions:

df = resultset.dataframe() # a python list-of-lists.
df = resultset.spark_dataframe() # a spark dataframe
df = resultset.pandas_dataframe() # a pandas dataframe 

The only difference is that for Spark resultsets, the data will be loaded from disk.

Retrieving Asynchronous Queries and ResultSets

Another difference is that because TQL queries executed on Spark can potentially be long-running operations, they are assigned a unique ID upon submission, that can be retrieved at any point in the future by another script or notebook run on the same machine. Get the id of the Resultset by calling get_id():

resultset.get_id()
459

You can then use this ID to retrieve the Query with the TQL function load_query(id), or the Resultset with load_resultset(id)

Connecting to an external Spark Cluster

By default, TQL uses an embedded, in-memory spark cluster with master point of local[*]. If your TQL workloads exceed the capacity of a local, embedded cluster, you should configure TQL to connect to a running spark cluster with a master point of yarn or mesos. In order to do this, add a custom configuration file and change the spark master to point to your desired cluster. For example:

pyspark:
  spark.master: yarn
NOTE If you plan on using TQL on a multi-machine cluster, files created by TQL must be accessible from all nodes, so you must also configure Noumena's filesystem root to use a distributed file system (see next section).

For more info on customizing TQL’s use of Spark, refer to configuring PySpark.

Reading and Writing To Cloud Storage

TQL supports reading and writing files to a cloud storage system such as Amazon S3 or Google Cloud Storage. If Noumena is installed on a cloud VM and that VM already has the necessary permissions, no further configuration is required. Read more about configuring AWS IAM Roles here and GCP IAM Roles here. If Noumena is installed on a machine outside of an IAM-managed environment, such as your local laptop, to read/write cloud storage files you will need to provide your authentication credentials in the Noumena configuration file, located (by default) at ~/.noumena/noumena_conf.yml. Please refer to configuring a filesystem for Noumena.

Downsampling ResultSets

Simple Downsampling

A common strategy for creating more performant machine learning pipelines is to downsample your data. A simple way to downsample is to introduce a where() clause with a random draw:

# use random() to only retain 20% of the data
select(
  event_metadata()
)\
.from_events('lethe4')\
.where('random() < 0.2')\
.limit(10)

Query results:

partition "_default"
_timeline_id _event_id _datetime _type
0 u252 51539616375 2021-01-02 00:02:36 bid
1 u252 51539616624 2021-01-02 00:30:55 bid
2 u252 51539609120 2021-01-02 03:36:02 activity
3 u252 51539618517 2021-01-02 05:54:20 bid
4 u252 51539618808 2021-01-02 07:58:05 bid
5 u252 51539622579 2021-01-02 18:32:04 bid
6 u252 51539623640 2021-01-02 20:36:42 bid
7 u252 8589935963 2021-01-04 17:31:33 activity
8 u252 8589936254 2021-01-04 19:22:22 activity
9 u252 8589936373 2021-01-04 20:22:25 activity
query produced 10 rows x 4 columns in 0.33 seconds

Downsampling By Attribute

Often you would prefer to downsample based on a known key, such as the user id. This way, you will retain all of the events for a user in your ResultSets:

select(
  event_metadata()
)\
.from_events('lethe4')\
.where('MD5_MOD(timeline.id, 10) < 2')\
.limit(10)

Query results:

partition "_default"
_timeline_id _event_id _datetime _type
0 u252 252 2021-01-01 00:00:00 user
1 u252 51539616375 2021-01-02 00:02:36 bid
2 u252 51539616491 2021-01-02 00:16:41 bid
3 u252 51539616624 2021-01-02 00:30:55 bid
4 u252 51539608944 2021-01-02 00:57:47 activity
5 u252 51539616852 2021-01-02 01:04:01 bid
6 u252 51539617100 2021-01-02 01:37:26 bid
7 u252 51539609033 2021-01-02 02:15:43 activity
8 u252 51539617425 2021-01-02 02:18:30 bid
9 u252 51539617722 2021-01-02 03:01:48 bid
query produced 10 rows x 4 columns in 0.34 seconds

Using the downsample_by() operator

TODO

Using the timeline_sample_rate() operator

TODO

Optimizing Computation with Scoped Variables

TQL allows you to pre-compute expensive variables, and reference them in your features to reduce computation time. For example, perhaps you have a feature that requires you to find the number of bids on each user’s timeline:

SIZE(FILTER(timeline.events, (x) -> x.type == "bid"))

In this case, we precompute this variable and store it in a TIMELINE_VAR(name, expr) expression. We can then reference it in the query’s expressions:

select(
  'TIMELINE_VAR("userBidCnt")',
)\
.from_events('lethe4')\
.timeline_var('userBidCnt', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid"))')\
.limit(3)

Query results:

partition "_default"
_c0
0 431.0
1 431.0
2 431.0
query produced 3 rows x 1 columns in 0.28 seconds

If we have 1000 timelines in our project, each with 20 events and our query has 3 features that use the bid count, pre-computing this value with a TIMELINE_VAR() expression will speed up computation of these features by 60x.

Similar costructs exist for global, non-timeline specific variables with GLOBAL_VAR(), and event-specific variables with EVENT_VAR(). Extending upon the previous example, lets use EVENT_VAR() to pre-compute the number of bids that occurred before a purchase event:

select(
  'type',
  'activity.conversion',
  'EVENT_VAR("bidsBeforePurchase")',
)\
.from_events('lethe4')\
.where('type == "activity" and activity.conversion == "purchase"')\
.event_var('bidsBeforePurchase', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid" AND x.timestamp < timestamp))')\
.limit(3)

Query results:

partition "_default"
_c0 _c1 _c2
0 activity purchase 46.0
1 activity purchase 55.0
2 activity purchase 64.0
query produced 3 rows x 3 columns in 0.31 seconds