Skip to content

Handling Large Universe on Blueshift®

A typical quant trading problem

As we have seen before, quant trading typically involves tracking a large universe of instruments and computing our trading signals on each one of them. We use a special construct and a special set of API functions on Blueshift® to do this efficiently.

Pipelines on Blueshift

We use what is called pipelines for such cases. A pipeline can be thought of as a computation graph, defined by a particular combination of some operations called factors and filters. A factor is an operation that takes in some defined inputs and returns a numerical value. A filter is similar to a factor, but returns a True or False (boolean) value instead of a number. The advantage of using a computation graph to tackle a repeated computation for a large universe is that computation can be lazy and carried out in chunks. This means, when we create a pipeline or define a factor or a filter, we do not do any computation. We just define how to carry our the computation. The actual computation is done when we query the pipeline for results.

Info

All pipeline functionalities on Blueshift® are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. The implementation is based on zipline Pipeline.

Defining factors or filters

These are the building blocks of our computation graph, i.e. pipeline. We do have some readymade factors and filters available, but oftentimes, we have to define our own. To see how it can be done, let scrutinize the code snippet below.

import numpy as np

from blueshift.pipeline import CustomFilter
from blueshift.pipeline.data import EquityPricing

def average_volume_filter(lookback, amount):
    class AvgDailyDollarVolumeTraded(CustomFilter):
        inputs = [EquityPricing.close, EquityPricing.volume]
        def compute(self,today,assets,out,close_price,volume):
            dollar_volume = np.mean(close_price * volume, axis=0)
            high_volume = dollar_volume > amount
            out[:] = high_volume
    return AvgDailyDollarVolumeTraded(window_length = lookback)
Let's understand what is going on here. We first import two things - a class called CustomFilter from the pipeline module and another class called EquityPricing from pipeline.data module. The first one provides us with an implementation to define a filter as described above. The later provides us with the boiler plates to define what inputs columns this filter will accept. EquityPricing implements the standard pricing columns - namely, open, high, low, close and volume.

To define a custom filter, all we have to do is to re-define the function compute as we see fit, to return the filtering that we wish. The function compute is a member of the base class CustomFilter (as well as CustomFactor in case we use that). Arguments to the compute function are as follows (ignoring the self variable which is just the class instance itself):

  • today:This is the date of the computation. This is automatically provided by the platform when we run the pipeline
  • assets: This is the list of all assets that are available to trade on the day the computation is invoked (today), again provided by the platform. This will be a numpy array of length N where N is the number of assets available as of today.
  • out: A numpy array with size equal to the number of assets in asset(N). This is where we store the results of the computation - one value ( number of boolean) for each assets. We can add many factors in a pipeline. The pipeline will combine all of them in to a multi-column array, a column each for the factors, corresponding to its out values, indexed by assets. When we query this pipeline from our strategy for a factor, the corresponding column is automatically returned.

The rest of the variables depend on what we want. In this example above, we want the inputs (which is a class variable) to have the columns close and volume from the standard pricing data. So the last two variables in the compute function will refer to them (we happen to name them as close_price and volume, but we could have given any names). These are also numpy arrays. The row length of the pricing data in these specified columns will be equal to the window_length variable of the class instance. This represents the number of bars (always daily) we will use to do our computation. The number of columns will be equal to the number of assets available on today.

In the main body of the compute function, we calculate the average daily volume for each asset, and use a particular threshold amount to return a True or False value - signifying if the volume is high enough.

The wrapper function

In the code snippet above, we implement a custom filter to compute average daily volume (> amount) - by defining a class AvgDailyDollarVolumeTraded which inherits from the base filter implementation CustomFilter. The top level function wraps this class definition with the arguments (i.e. lookback that we set as the window_length and amount that we filter on). It also conveniently creates an instance of the class and returns the same.

Custom factors

These will be very similar to the filter above, just the returned value will be numerical and we have to derive the custom class from CustomFactor and not CustomFilter. Next section has an example.

Building the pipeline

The above code snippet defines the operation of the filter. To use this, we need to define the computation graph, i.e. the pipeline itself.

This can typically be done by the following code snippet:

from blueshift.pipeline import Pipeline
...
def make_strategy_pipeline(context):
    pipe = Pipeline()

    # get the strategy parameters defined in initialize
    lookback = context.params['lookback']
    amount = context.params['min_volume']

    # Set the volume filter
    volume_filter = average_volume_filter(lookback, amount)

    pipe.add(volume_filter,'high_volume')

    return pipe
Here we first create an empty pipeline using the constructor Pipeline. Then we invoke our function average_volume_filter just defined above. Finally we add this operation to the pipeline and returns it. When we query the pipeline later, the results are dynamically computed and returned under the column high_volume (passed as the second argument to the add method).

The pipeline construction function, like make_strategy_pipeline above is typically called once in the initialize function. However, it is not sufficient to build the pipeline. We also need to attach the pipeline to the current strategy. All this is done using the attach_pipeline API function as below:

from blueshift.api import attach_pipeline

def initialize(context):
    ... # other related code
    # Set up the pipe-lines for strategies
    attach_pipeline(make_strategy_pipeline(context), 
            name='strategy_pipeline')
The first argument is the pipeline instance we built. The second argument strategy_pipeline is the name we can refer to it by later in our strategy code as shown below.

Querying the pipeline

Once we have defined our filters and factors and built the pipeline too, we are now ready to use it. This is typically done using the API function pipeline_output as below:

from blueshift.api import pipeline_output
...
def handle_data(context, data):
    pipeline_results = pipeline_output('strategy_pipeline')
    print(pipeline_results.index)
Here we query our named pipeline strategy_pipeline. The returned result pipeline_results is a numpy array, indexed by the assets and have a column for each factors added during the pipeline construction, using the add method. In the above example, it will have a single column named high_volume. These calls to pipeline_output trigger actual computation of the pipeline.

Combining factors

We can add multiple factors using the add method during the pipeline construction, each becoming a separate column. But many times we may need to combine factors in a single column. We can do that by using standard arithmetic operators, as in the code below:

...
factor1 = first_factor(some_params)
factor2 = second_factor(other_params)
combined_factor = factor1 + factor1
Here both factor1 and factor2 defines the factors, i.e. some operations. The combined factor combined_factor is also a factor, that is defined as combination of the others. Note, we do not carry out any computations here. The + operator is NOT operating on numbers, but on instances of type CustomFactor. The platform has the standard arithmatic operators overloaded (including +), so that we can combine factors in this way. The actual computation is done when we query the pipeline.

For filters, we use the & (and), | (or) and ! (not) operator for combining. This is natural as they are boolean operations by design.

...
filter1 = first_filter(some_params)
filter2 = second_filter(other_params)
combined_filter = filter1 & filter2

Combining factor and filter

Finally, when we need to combine both a factor and a filter, we used the add method using the factor and use set_screen method to add the filter, as below

...
pipe = Pipeline()

filter1 = my_filter(some_params)
factor = my_factor(other_params)
pipe.add(factor, "some_factor_name")
pipe.set_screen(filter1)

Strategy example using pipeline

A simple strategy example using pipeline is available in the default templates when you create a strategy - under the name SMA Pipeline. For a more advanced example, continue to the next section.