Pipeline APIs - Large Universe

Quantitative investing approaches typically involve tracking a large universe of instruments and computing our trading signals on each one of them. On Blueshift, we use a special construct and a special set of APIs - known as the pipeline APIs to do this efficiently.

A pipeline can be thought of as a computation graph, defined by a particular combination of some operations called factors and filters. A factor is an operation that takes in some defined inputs and returns a numerical value. A filter is similar to a factor, but returns a True or False (boolean) value instead of a number. The advantage of using a computation graph to tackle a repeated computation for a large universe is that computation can be lazy and carried out in chunks. This means, when we create a pipeline or define a factor or a filter, we do not do any computation. We just define how to carry our the computation. The actual computation is done when we query the pipeline for results.

Pipeline In Strategies

Using the pipeline APIs in a Blueshift strategy involves three steps. First, we need to define the pipeline object. This involves instantiating a pipeline (the computation graph) and then adding factors and filters to it. Second, we need to attach the pipeline object to the current strategy by a name so that we can refer to it later (and also so that the strategy can run and cache the results efficiently). And finally, when we need the result, we ask for it.

Below is a sample that involves all of these steps.

from blueshift.api import schedule_function, date_rules, time_rules
from blueshift.api import order_target, attach_pipeline, pipeline_output

from blueshift.pipeline import Pipeline
from blueshift.library.pipelines import period_returns, select_universe

def make_pipeline(context):
    # create the pipelines and defines the factors and filters
    pipe = Pipeline()
    monthly_rets = period_returns(20, offset=0)
    liquidity = select_universe(200, 100)
    pipe.add(monthly_rets,'momentum')
    pipe.set_screen(liquidity)
    return pipe

def initialize(context):
    schedule_function(strategy, date_rules.month_start(), time_rules.at('10:00'))
    # attach the pipeline to the strategy
    attach_pipeline(make_pipeline(context), 'strategy_pipe')

def strategy(context, data):
    try:
        # request for the pipeline results
        results = pipeline_output('strategy_pipe')
    except Exception as e:
        print(f'error in getting pipeline output:{str(e)}')

    print(f'number of assets {len(results.index)}')

In the above examples, we select the top 100 stocks based on 200-day daily average volume and compute the returns over last 20days for each one of them.

Building the pipeline object

To do that, first we define the pipeline. We import the Pipeline class from blueshift.pipeline and use it to create an empty object in the make_pipeline function. Then we use a readymade factor (period_returns) and a readymade filter (select_universe), both imported from the blueshift.library.pipelines package, to achieve our goal. The select_universe takes in a lookback period and a size parameter to filter the whole asset universe based on average dollar volume (averaged over the lookback period) and returns the top size count of assets. In our case, this is the top 100 stocks based on 200-day average dollar volume. The period_returns takes in a lookback parameter and an offset parameter and computes price returns between today-offset and today-offset-lookback days. With offset=0 and lookback=20, it gives us 20day returns starting from today.

Note the above functions - period_returns and select_universe define operations and does not do any calculations (not yet). These operations are then added to the empty pipeline object. The factor operation is added using the add method of the pipeline object and the filter operation is added using the set_screen method. To add multiple factors, we can use the add method as many times as needed. For adding multiple filters, we can use the “&” and “|” operators to logically combine them.

Finally, we return the pipeline object from the make_pipeline function. This object is used in the attach_pipeline API function to register the pipeline with the strategy, using a name we choose (strategy_pipe in this case).

Fetching pipeline results

Once the pipeline is defined and registered with the strategy, we can call the pipeline_output API function to get the current output from the pipeline we have defined. In this case, it will return the 20-day returns for top 100 stocks for the day the result is requested.

Important

All pipeline functionalities on Blueshift are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. Also, requesting result from a pipeline on day t will carry out all computation based on data till previous day (t-1), to ensure no look-ahead bais.

The return value from the pipeline_output is always a dataframe. The index of the dataframe is all the assets that has passed the filters we have defined (using set_screen, see above) to the pipeline object, and columns of the dataframe are the factors we have added (using the add method, see above). If we do not add any filters, all assets in the universe are chosen (i.e. will be in the index of the result). If we do not add any factors, the dataframe itself will be empty. The column name for each factor added will be the name passed in the add method of the pipeline object while defining it.

Some useful filters and factors

The blueshift library has some useful filters and factors readily available.

blueshift.library.pipelines.select_universe(lookback, size, context=None)

Returns a custom filter object for volume-based filtering.

Args:

lookback (int): lookback window size size (int): Top n assets to return.

Returns:

A custom filter object

# from library.pipelines.pipelines import select_universe

pipe = Pipeline()
top_100 = select_universe(252, 100)
pipe.set_screen(top_100)
blueshift.library.pipelines.average_volume_filter(lookback, amount, context=None)

Returns a custom filter object for volume-based filtering.

Args:

lookback (int): lookback window size amount (int): amount to filter (high-pass)

Returns:

A custom filter object

# from library.pipelines.pipelines import average_volume_filter

pipe = Pipeline()
volume_filter = average_volume_filter(200, 1000000)
pipe.set_screen(volume_filter)
blueshift.library.pipelines.period_returns(lookback, offset=0, context=None)

Returns a custom factor object for computing simple returns over a period (lookback).

Args:

lookback (int): lookback window size offset (int): offset from the end of the window

Returns:

A custom factor object.

# from library.pipelines.pipelines import returns_factor
pipe = Pipeline()
momentum = returns_factor(200)
pipe.add(momentum,'momentum')
blueshift.library.pipelines.technical_factor(lookback, indicator_fn, indicator_lookback=None, context=None)

A factory function to generate a custom factor by applying a user-defined function over asset closing prices.

Args:

lookback (int): lookback window size indicator_fn (function): user-defined function indicator_lookback (int): lookback for user-defined function.

Returns:

A custom factor object applying the supplied function.

Note:

The indicator_fn must be of the form f(px, n), where px is numpy ndarray and lookback is an n. Also the lookback argument above must be greater than or equal to the other argument indicator_lookback. If None it is set as the same value of lookback.

# from library.pipelines.pipelines import technical_factor

pipe = Pipeline()
rsi_factor = technical_factor(14, rsi)
pipe.add(rsi_factor,'rsi')

These are usually the most commonly used factors and are conveniently wrapped in a function call. Besides these, there are many built-in factors and filters that you can directly import and instantiate or you can create your own custom filters or factors

Custom Filter and Factors

To use custom filter or factors, we need to define a new class (based on the respective base classes) and implement the compute method of the class, as show below

from blueshift.pipeline import Pipeline, CustomFilter, CustomFactor
from blueshift.pipeline.data import EquityPricing

class CloseAboveHighFilter(CustomFilter):
    inputs = [EquityPricing.close, EquityPricing.high]
    def compute(self,today,assets,out,close_price, high_price):
        out[:] = (close_price[-1] > high_price[-2])

class CloseAboveHighFactor(CustomFilter):
    inputs = [EquityPricing.close, EquityPricing.high]
    def compute(self,today,assets,out,close_price, high_price):
        out[:] = (close_price[-1] - high_price[-2])

def make_pipeline(context):
    pipe = Pipeline()
    myfilter = CloseAboveHighFilter(window_length=10)
    myfactor = CloseAboveHighFactor(window_length=10)
    pipe.add(myfactor,'close_high_diff')
    pipe.set_screen(myfilter)
    return pipe

In the above code snippet, we use the same computation, a comparison of last close vs high price 2 days before. We define a filter and a factor based on this to highlight their imlpementation and differences.

For the custom filter, we define the class CloseAboveHighFilter based on the CustomFilter class. We also import EquityPricing to define the pricing columns to be used in the class. The inputs lists the required field(s) to be used. The compute method gets passed automatically a few variables - namely 1) today: this is the date of computation, 2) assets: list of all assets objects alive on that date, 3) out: the output numpy array to hold the results of the computation (this is a 1D array of size equal to the number of assets). The compute function also gets passed additional columns that we define in the inputs field (in the order they are defined). This is exactly the same for both the filter and factor implementation. The only difference is that in filter implementation, the results are boolean and in factors, they are numbers.

Once we have defined the classes, we can use them in building the pipeline, by instatiating them with a window length. The window length determines the size of data passed to the compute function extra (pricing) fields. For example, in this case, if we assume at the computation date, there were total 2000 assets available to trade, the close_price and the high_price arguments will be a numpy array of shape (2000,10)