Pipeline APIs - Large Universe
Quantitative investing approaches typically involve tracking a large universe
of instruments and computing our trading signals on each one of them. On
Blueshift, we use a special construct and a special set of APIs - known as the
pipeline
APIs to do this efficiently.
A pipeline can be thought of as a computation graph, defined by a particular
combination of some operations called factors
and filters
. A factor is an
operation that takes in some defined inputs and returns a numerical value. A
filter is similar to a factor, but returns a True or False (boolean) value
instead of a number. The advantage of using a computation graph to tackle a
repeated computation for a large universe is that computation can be lazy and
carried out in chunks. This means, when we create a pipeline or define a factor
or a filter, we do not do any computation. We just define how to carry our the
computation. The actual computation is done when we query the pipeline for
results.
Pipeline In Strategies
Using the pipeline APIs in a Blueshift strategy involves three steps. First, we need to define the pipeline object. This involves instantiating a pipeline (the computation graph) and then adding factors and filters to it. Second, we need to attach the pipeline object to the current strategy by a name so that we can refer to it later (and also so that the strategy can run and cache the results efficiently). And finally, when we need the result, we ask for it.
Below is a sample that involves all of these steps.
from blueshift.api import schedule_function, date_rules, time_rules
from blueshift.api import order_target, attach_pipeline, pipeline_output
from blueshift.pipeline import Pipeline
from blueshift.library.pipelines import period_returns, select_universe
def make_pipeline(context):
# create the pipelines and defines the factors and filters
pipe = Pipeline()
monthly_rets = period_returns(20, offset=0)
liquidity = select_universe(200, 100)
pipe.add(monthly_rets,'momentum')
pipe.set_screen(liquidity)
return pipe
def initialize(context):
schedule_function(strategy, date_rules.month_start(), time_rules.at('10:00'))
# attach the pipeline to the strategy
attach_pipeline(make_pipeline(context), 'strategy_pipe')
def strategy(context, data):
try:
# request for the pipeline results
results = pipeline_output('strategy_pipe')
except Exception as e:
print(f'error in getting pipeline output:{str(e)}')
print(f'number of assets {len(results.index)}')
In the above examples, we select the top 100 stocks based on 200-day daily average volume and compute the returns over last 20days for each one of them.
Building the pipeline object
To do that, first we define the pipeline. We import the Pipeline
class from
blueshift.pipeline and use it to create an empty object in the make_pipeline
function. Then we use a readymade factor (period_returns
) and a readymade
filter (select_universe
), both imported from the blueshift.library.pipelines
package, to achieve our goal. The select_universe takes in a lookback period
and a size parameter to filter the whole asset universe based on average
dollar volume (averaged over the lookback period) and returns the top size
count of assets. In our case, this is the top 100 stocks based on 200-day
average dollar volume. The period_returns takes in a lookback parameter and
an offset parameter and computes price returns between today-offset and
today-offset-lookback days. With offset=0 and lookback=20, it gives us 20day
returns starting from today.
Note the above functions - period_returns and select_universe define
operations and does not do any calculations (not yet). These operations are
then added to the empty pipeline object. The factor operation is added using
the add
method of the pipeline object and the filter operation is added
using the set_screen
method. To add multiple factors, we can use the add
method as many times as needed. For adding multiple filters, we can use the
“&” and “|” operators to logically combine them.
Finally, we return the pipeline object from the make_pipeline
function.
This object is used in the attach_pipeline
API function to register the
pipeline with the strategy, using a name we choose (strategy_pipe in this
case).
Fetching pipeline results
Once the pipeline is defined and registered with the strategy, we can call the
pipeline_output
API function to get the current output from the pipeline
we have defined. In this case, it will return the 20-day returns for top 100
stocks for the day the result is requested.
Important
All pipeline functionalities on Blueshift are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. Also, requesting result from a pipeline on day t will carry out all computation based on data till previous day (t-1), to ensure no look-ahead bais.
The return value from the pipeline_output
is always a dataframe. The index
of the dataframe is all the assets that has passed the filters we have defined
(using set_screen
, see above) to the pipeline object, and columns of the
dataframe are the factors we have added (using the add
method, see above).
If we do not add any filters, all assets in the universe are chosen (i.e.
will be in the index of the result). If we do not add any factors, the dataframe
itself will be empty. The column name for each factor added will be the name
passed in the add
method of the pipeline object while defining it.
Some useful filters and factors
The blueshift library has some useful filters and factors readily available.
- blueshift.library.pipelines.select_universe(lookback, size, context=None)
Returns a custom filter object for volume-based filtering.
- Args:
lookback (int): lookback window size size (int): Top n assets to return.
- Returns:
A custom filter object
# from library.pipelines.pipelines import select_universe pipe = Pipeline() top_100 = select_universe(252, 100) pipe.set_screen(top_100)
- blueshift.library.pipelines.average_volume_filter(lookback, amount, context=None)
Returns a custom filter object for volume-based filtering.
- Args:
lookback (int): lookback window size amount (int): amount to filter (high-pass)
- Returns:
A custom filter object
# from library.pipelines.pipelines import average_volume_filter pipe = Pipeline() volume_filter = average_volume_filter(200, 1000000) pipe.set_screen(volume_filter)
- blueshift.library.pipelines.period_returns(lookback, offset=0, context=None)
Returns a custom factor object for computing simple returns over a period (lookback).
- Args:
lookback (int): lookback window size offset (int): offset from the end of the window
- Returns:
A custom factor object.
# from library.pipelines.pipelines import returns_factor pipe = Pipeline() momentum = returns_factor(200) pipe.add(momentum,'momentum')
- blueshift.library.pipelines.technical_factor(lookback, indicator_fn, indicator_lookback=None, context=None)
A factory function to generate a custom factor by applying a user-defined function over asset closing prices.
- Args:
lookback (int): lookback window size indicator_fn (function): user-defined function indicator_lookback (int): lookback for user-defined function.
- Returns:
A custom factor object applying the supplied function.
- Note:
The indicator_fn must be of the form f(px, n), where px is numpy ndarray and lookback is an n. Also the lookback argument above must be greater than or equal to the other argument indicator_lookback. If None it is set as the same value of lookback.
# from library.pipelines.pipelines import technical_factor pipe = Pipeline() rsi_factor = technical_factor(14, rsi) pipe.add(rsi_factor,'rsi')
These are usually the most commonly used factors and are conveniently wrapped in a function call. Besides these, there are many built-in factors and filters that you can directly import and instantiate or you can create your own custom filters or factors
Custom Filter and Factors
To use custom filter or factors, we need to define a new class (based on the
respective base classes) and implement the compute
method of the class, as
show below
from blueshift.pipeline import Pipeline, CustomFilter, CustomFactor
from blueshift.pipeline.data import EquityPricing
class CloseAboveHighFilter(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.high]
def compute(self,today,assets,out,close_price, high_price):
out[:] = (close_price[-1] > high_price[-2])
class CloseAboveHighFactor(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.high]
def compute(self,today,assets,out,close_price, high_price):
out[:] = (close_price[-1] - high_price[-2])
def make_pipeline(context):
pipe = Pipeline()
myfilter = CloseAboveHighFilter(window_length=10)
myfactor = CloseAboveHighFactor(window_length=10)
pipe.add(myfactor,'close_high_diff')
pipe.set_screen(myfilter)
return pipe
In the above code snippet, we use the same computation, a comparison of last close vs high price 2 days before. We define a filter and a factor based on this to highlight their imlpementation and differences.
For the custom filter, we define the class CloseAboveHighFilter based on the
CustomFilter
class. We also import EquityPricing
to define the pricing
columns to be used in the class. The inputs
lists the required field(s) to
be used. The compute
method gets passed automatically a few variables -
namely 1) today: this is the date of computation, 2) assets: list of all
assets objects alive on that date, 3) out: the output numpy array to hold the
results of the computation (this is a 1D array of size equal to the number of
assets). The compute function also gets passed additional columns that we
define in the inputs field (in the order they are defined). This is exactly
the same for both the filter and factor implementation. The only difference is
that in filter implementation, the results are boolean and in factors, they are
numbers.
Once we have defined the classes, we can use them in building the pipeline, by instatiating them with a window length. The window length determines the size of data passed to the compute function extra (pricing) fields. For example, in this case, if we assume at the computation date, there were total 2000 assets available to trade, the close_price and the high_price arguments will be a numpy array of shape (2000,10)