# Scaling TQL TQL is designed to be scalable out of the box, via three main approaches: executing queries on a cluster with Spark, representative downsampling, and pre-computation with named variables. ```python from zeenk.tql import * ``` ------------------------------------------------------------------ Version: 20.1.17-SNAPSHOT Version Timestamp: 2021-08-06 21:43:03 Version Age: 4 days, 16 hours, 5 minutes, 56 seconds Filesystem Root: /Users/zkozick/.noumena/files Working Directory: /Users/zkozick/.noumena Configuration File: /Users/zkozick/.noumena/noumena_conf.yml Api Gateway: http://localhost:9000 Service Status: Icarus: ONLINE, Daedalus: ONLINE Service Uptime: 1 days, 50 minutes, 8 seconds ------------------------------------------------------------------ ## TQL on Spark ### Submitting Queries to a Spark Cluster TQL queries are by default executed on a sample of 1000 timelines inside the API server to ensure interactive response times. TQL also ships with a distributed execution engine, built on top of Apache Spark. In order to use Spark to dispatch your query, simply provide the kwarg `spark=True` when you call submit: ```python resultset = select( 'bid.ad_size', 'bid.bid' ).from_events('lethe4')\ .limit(10)\ .submit(spark=True, wait=True) ``` Click to Abort Job

NOTE: Abort link will open the result of the call in a new tab/window

SUCCESS: 1/1|██████████|, TOTAL ELAPSED=0:00:09, WAIT=0:00:00, PENDING=0:00:01, RUNNING=0:00:07 ### Spark Resultsets TQL Queries submitted to Spark also return Resultset objects, but the difference is that each partition in the resultset will have a path to the data on local disk or a distributed file system, instead of the data itself: ```python resultset.partition('_default').data_path() ``` 'file:/Users/zkozick/.noumena/files/datasets/2021-08-11/118428900272667/data/partition=_default' Just like interactive resultsets, the partition data can be retrieved using the dataframe() functions: ```python df = resultset.dataframe() # a python list-of-lists. df = resultset.spark_dataframe() # a spark dataframe df = resultset.pandas_dataframe() # a pandas dataframe ``` The only difference is that for Spark resultsets, the data will be loaded from disk. ### Retrieving Asynchronous Queries and ResultSets Another difference is that because TQL queries executed on Spark can potentially be long-running operations, they are assigned a unique ID upon submission, that can be retrieved at any point in the future by another script or notebook run on the same machine. Get the id of the Resultset by calling `get_id()`: ```python resultset.get_id() ``` 459 You can then use this ID to retrieve the Query with the TQL function `load_query(id)`, or the Resultset` with load_resultset(id)` ### Connecting to an external Spark Cluster By default, TQL uses an embedded, in-memory spark cluster with master point of `local[*]`. If your TQL workloads exceed the capacity of a local, embedded cluster, you should configure TQL to connect to a running spark cluster with a master point of `yarn` or `mesos`. In order to do this, add a custom configuration file and change the spark master to point to your desired cluster. For example: ``` pyspark: spark.master: yarn ```
NOTE If you plan on using TQL on a multi-machine cluster, files created by TQL must be accessible from all nodes, so you must also configure Noumena's filesystem root to use a distributed file system (see next section).
For more info on customizing TQL's use of Spark, refer to [configuring PySpark](configuration.html#pyspark). ### Reading and Writing To Cloud Storage TQL supports reading and writing files to a cloud storage system such as Amazon S3 or Google Cloud Storage. If Noumena is installed on a cloud VM and that VM already has the necessary permissions, no further configuration is required. Read more about configuring AWS IAM Roles [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html) and GCP IAM Roles [here](https://cloud.google.com/iam/docs/understanding-service-accounts). If Noumena is installed on a machine outside of an IAM-managed environment, such as your local laptop, to read/write cloud storage files you will need to provide your authentication credentials in the Noumena configuration file, located (by default) at `~/.noumena/noumena_conf.yml`. Please refer to [configuring a filesystem for Noumena](configuration.html#filesystem). ## Downsampling ResultSets ### Simple Downsampling A common strategy for creating more performant machine learning pipelines is to downsample your data. A simple way to downsample is to introduce a `where()` clause with a random draw: ```python # use random() to only retain 20% of the data select( event_metadata() )\ .from_events('lethe4')\ .where('random() < 0.2')\ .limit(10) ```

Query results:

partition "_default"
_timeline_id _event_id _datetime _type
0 u252 51539616375 2021-01-02 00:02:36 bid
1 u252 51539616624 2021-01-02 00:30:55 bid
2 u252 51539609120 2021-01-02 03:36:02 activity
3 u252 51539618517 2021-01-02 05:54:20 bid
4 u252 51539618808 2021-01-02 07:58:05 bid
5 u252 51539622579 2021-01-02 18:32:04 bid
6 u252 51539623640 2021-01-02 20:36:42 bid
7 u252 8589935963 2021-01-04 17:31:33 activity
8 u252 8589936254 2021-01-04 19:22:22 activity
9 u252 8589936373 2021-01-04 20:22:25 activity
query produced 10 rows x 4 columns in 0.33 seconds

### Downsampling By Attribute Often you would prefer to downsample based on a known key, such as the user id. This way, you will retain all of the events for a user in your ResultSets: ```python select( event_metadata() )\ .from_events('lethe4')\ .where('MD5_MOD(timeline.id, 10) < 2')\ .limit(10) ```

Query results:

partition "_default"
_timeline_id _event_id _datetime _type
0 u252 252 2021-01-01 00:00:00 user
1 u252 51539616375 2021-01-02 00:02:36 bid
2 u252 51539616491 2021-01-02 00:16:41 bid
3 u252 51539616624 2021-01-02 00:30:55 bid
4 u252 51539608944 2021-01-02 00:57:47 activity
5 u252 51539616852 2021-01-02 01:04:01 bid
6 u252 51539617100 2021-01-02 01:37:26 bid
7 u252 51539609033 2021-01-02 02:15:43 activity
8 u252 51539617425 2021-01-02 02:18:30 bid
9 u252 51539617722 2021-01-02 03:01:48 bid
query produced 10 rows x 4 columns in 0.34 seconds

### Using the downsample_by() operator TODO ### Using the timeline_sample_rate() operator TODO ## Optimizing Computation with Scoped Variables TQL allows you to pre-compute expensive variables, and reference them in your features to reduce computation time. For example, perhaps you have a feature that requires you to find the number of bids on each user's timeline: ``` SIZE(FILTER(timeline.events, (x) -> x.type == "bid")) ``` In this case, we precompute this variable and store it in a `TIMELINE_VAR(name, expr)` expression. We can then reference it in the query's expressions: ```python select( 'TIMELINE_VAR("userBidCnt")', )\ .from_events('lethe4')\ .timeline_var('userBidCnt', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid"))')\ .limit(3) ```

Query results:

partition "_default"
_c0
0 431.0
1 431.0
2 431.0
query produced 3 rows x 1 columns in 0.28 seconds

If we have 1000 timelines in our project, each with 20 events and our query has 3 features that use the bid count, pre-computing this value with a `TIMELINE_VAR()` expression will speed up computation of these features by 60x. Similar costructs exist for global, non-timeline specific variables with `GLOBAL_VAR()`, and event-specific variables with `EVENT_VAR()`. Extending upon the previous example, lets use `EVENT_VAR()` to pre-compute the number of bids that occurred before a purchase event: ```python select( 'type', 'activity.conversion', 'EVENT_VAR("bidsBeforePurchase")', )\ .from_events('lethe4')\ .where('type == "activity" and activity.conversion == "purchase"')\ .event_var('bidsBeforePurchase', 'SIZE(FILTER(timeline.events, (x) -> x.type == "bid" AND x.timestamp < timestamp))')\ .limit(3) ```

Query results:

partition "_default"
_c0 _c1 _c2
0 activity purchase 46.0
1 activity purchase 55.0
2 activity purchase 64.0
query produced 3 rows x 3 columns in 0.31 seconds