Pipeline APIs - Large Universe¶
Quantitative investing approaches typically involve tracking a large universe
of instruments and computing our trading signals on each one of them. On
Blueshift, we use a special construct and a special set of APIs - known as the
pipeline
APIs to do this efficiently.
A pipeline can be thought of as a computation graph, defined by a particular
combination of some operations called factors
and filters
. A factor is an
operation that takes in some defined inputs and returns a numerical value. A
filter is similar to a factor, but returns a True or False (boolean) value
instead of a number. The advantage of using a computation graph to tackle a
repeated computation for a large universe is that computation can be lazy and
carried out in chunks. This means, when we create a pipeline or define a factor
or a filter, we do not do any computation. We just define how to carry our the
computation. The actual computation is done when we query the pipeline for
results.
Pipeline In Strategies¶
Using the pipeline APIs in a Blueshift strategy involves three steps. First, we need to define the pipeline object. This involves instantiating a pipeline (the computation graph) and then adding factors and filters to it. Second, we need to attach the pipeline object to the current strategy by a name so that we can refer to it later (and also so that the strategy can run and cache the results efficiently). And finally, when we need the result, we ask for it.
Below is a sample that involves all of these steps.
from blueshift.api import schedule_function, date_rules, time_rules
from blueshift.api import order_target, attach_pipeline, pipeline_output
from blueshift.pipeline import Pipeline
from blueshift.library.pipelines import period_returns, select_universe
def make_pipeline(context):
# create the pipelines and defines the factors and filters
pipe = Pipeline()
monthly_rets = period_returns(20, offset=0)
liquidity = select_universe(200, 100)
pipe.add(monthly_rets,'momentum')
pipe.set_screen(liquidity)
return pipe
def initialize(context):
schedule_function(strategy, date_rules.month_start(), time_rules.at('10:00'))
# attach the pipeline to the strategy
attach_pipeline(make_pipeline(context), 'strategy_pipe')
def strategy(context, data):
try:
# request for the pipeline results
results = pipeline_output('strategy_pipe')
except Exception as e:
print(f'error in getting pipeline output:{str(e)}')
print(f'number of assets {len(results.index)}')
In the above examples, we select the top 100 stocks based on 200-day daily average volume and compute the returns over last 20days for each one of them.
Building the pipeline object¶
To do that, first we define the pipeline. We import the Pipeline
class from
blueshift.pipeline and use it to create an empty object in the make_pipeline
function. Then we use a readymade factor (period_returns
) and a readymade
filter (select_universe
), both imported from the blueshift.library.pipelines
package, to achieve our goal. The select_universe takes in a lookback period
and a size parameter to filter the whole asset universe based on average
dollar volume (averaged over the lookback period) and returns the top size
count of assets. In our case, this is the top 100 stocks based on 200-day
average dollar volume. The period_returns takes in a lookback parameter and
an offset parameter and computes price returns between today-offset and
today-offset-lookback days. With offset=0 and lookback=20, it gives us 20day
returns starting from today.
Note the above functions - period_returns and select_universe define
operations and does not do any calculations (not yet). These operations are
then added to the empty pipeline object. The factor operation is added using
the add
method of the pipeline object and the filter operation is added
using the set_screen
method. To add multiple factors, we can use the add
method as many times as needed. For adding multiple filters, we can use the
“&” and “|” operators to logically combine them.
Finally, we return the pipeline object from the make_pipeline
function.
This object is used in the attach_pipeline
API function to register the
pipeline with the strategy, using a name we choose (strategy_pipe in this
case).
Fetching pipeline results¶
Once the pipeline is defined and registered with the strategy, we can call the
pipeline_output
API function to get the current output from the pipeline
we have defined. In this case, it will return the 20-day returns for top 100
stocks for the day the result is requested.
Important
All pipeline functionalities on Blueshift are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. Also, requesting result from a pipeline on day t will carry out all computation based on data till previous day (t-1), to ensure no look-ahead bias.
The return value from the pipeline_output
is always a dataframe. The index
of the dataframe is all the assets that has passed the filters we have defined
(using set_screen
, see above) to the pipeline object, and columns of the
dataframe are the factors we have added (using the add
method, see above).
If we do not add any filters, all assets in the universe are chosen (i.e.
will be in the index of the result). If we do not add any factors, the dataframe
itself will be empty. The column name for each factor added will be the name
passed in the add
method of the pipeline object while defining it.
Some useful filters and factors¶
The blueshift library has some useful filters and factors readily available.
- blueshift.library.pipelines.select_universe(lookback, size, context=None)
Returns a custom filter object for volume-based filtering.
- Args:
lookback (int): lookback window size size (int): Top n assets to return.
- Returns:
A custom filter object
# from blueshift.library.pipelines.pipelines import select_universe pipe = Pipeline() top_100 = select_universe(252, 100) pipe.set_screen(top_100)
- blueshift.library.pipelines.average_volume_filter(lookback, amount, context=None)
Returns a custom filter object for volume-based filtering.
- Args:
lookback (int): lookback window size amount (int): amount to filter (high-pass)
- Returns:
A custom filter object
# from blueshift.library.pipelines.pipelines import average_volume_filter # then inside the pipeline builder function pipe = Pipeline() volume_filter = average_volume_filter(200, 1000000) pipe.set_screen(volume_filter)
- blueshift.library.pipelines.period_returns(lookback, offset=0, context=None)
Returns a custom factor object for computing simple returns over a period (lookback).
- Args:
lookback (int): lookback window size offset (int): offset from the end of the window
- Returns:
A custom factor object.
# from blueshift.library.pipelines.pipelines import returns_factor # then inside the pipeline builder function pipe = Pipeline() momentum = returns_factor(200) pipe.add(momentum,'momentum')
- blueshift.library.pipelines.technical_factor(lookback, indicator_fn, indicator_lookback=None, context=None)
A factory function to generate a custom factor by applying a user-defined function over asset closing prices.
- Args:
lookback (int): lookback window size indicator_fn (function): user-defined function indicator_lookback (int): lookback for user-defined function.
- Returns:
A custom factor object applying the supplied function.
- Note:
The indicator_fn must be of the form f(px, n), where px is numpy ndarray and lookback is an n. Also the lookback argument above must be greater than or equal to the other argument indicator_lookback. If None it is set as the same value of lookback.
# from blueshift.library.pipelines.pipelines import technical_factor # from blueshift.library.technicals.indicators import rsi # then inside the pipeline builder function pipe = Pipeline() rsi_factor = technical_factor(14, rsi) pipe.add(rsi_factor,'rsi')
These are usually the most commonly used factors and are conveniently wrapped in a function call. Besides these, there are many built-in factors and filters that you can directly import and instantiate or you can create your own custom filters or factors
Custom Filter and Factors¶
To use custom filter or factors, we need to define a new class (based on the
respective base classes) and implement the compute
method of the class, as
show below
from blueshift.pipeline import Pipeline, CustomFilter, CustomFactor
from blueshift.pipeline.data import EquityPricing
class CloseAboveHighFilter(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.high]
def compute(self,today,assets,out,close_price, high_price):
out[:] = (close_price[-1] > high_price[-2])
class CloseAboveHighFactor(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.high]
def compute(self,today,assets,out,close_price, high_price):
out[:] = (close_price[-1] - high_price[-2])
def make_pipeline(context):
pipe = Pipeline()
myfilter = CloseAboveHighFilter(window_length=10)
myfactor = CloseAboveHighFactor(window_length=10)
pipe.add(myfactor,'close_high_diff')
pipe.set_screen(myfilter)
return pipe
In the above code snippet, we use the same computation, a comparison of last close vs high price 2 days before. We define a filter and a factor based on this to highlight their implementation and differences.
For the custom filter, we define the class CloseAboveHighFilter based on the
CustomFilter
class. We also import EquityPricing
to define the pricing
columns to be used in the class. The inputs
lists the required field(s) to
be used. The compute
method gets passed automatically a few variables -
namely 1) today: this is the date of computation, 2) assets: list of all
assets objects alive on that date, 3) out: the output numpy array to hold the
results of the computation (this is a 1D array of size equal to the number of
assets). The compute function also gets passed additional columns that we
define in the inputs field (in the order they are defined). This is exactly
the same for both the filter and factor implementation. The only difference is
that in filter implementation, the results are boolean and in factors, they are
numbers.
Once we have defined the classes, we can use them in building the pipeline, by instantiating them with a window length. The window length determines the size of data passed to the compute function extra (pricing) fields. For example, in this case, if we assume at the computation date, there were total 2000 assets available to trade, the close_price and the high_price arguments will be a numpy array of shape (2000,10)
Pipeline Examples¶
Below are a few concrete examples of pipeline based strategies. The first is a simple timeseries momentum strategy based on returns over last 3 months. The second example add a low beta filter on the same strategy, and finally the third one add some fundamentals filtering to improve the outcome. Note the general flow: we define the strategy (and any custom filters/ factors if required), and then attach the pipeline object to the strategy with a name using the attach_pipeline API method. Later we query the output for a specific day using the pipeline_output API method.
Example 1 - Simple Timeseries Momentum¶
This strategy picks up the best 5 stocks every month, based on return overs last 3 months and allocate capital equally. The filters used are a liquidity filter (dollar traded volume > 10M) and positive returns (since it is a long only strategy).
from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor
from blueshift.library.technicals.indicators import roc
from blueshift.pipeline import Pipeline
from blueshift.errors import NoFurtherDataError
from blueshift.api import order_target_percent, schedule_function, log_info
from blueshift.api import date_rules, time_rules, get_datetime
from blueshift.api import attach_pipeline, pipeline_output
def initialize(context):
# The context variables can be accessed by other methods
context.params = {'lookback':3,
'size':5,
'min_volume':1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(run_strategy, date_rules.month_start(),
time_rules.market_close(minutes=30))
# Set up the pipe-lines for strategies
attach_pipeline(make_screener(context), name='my_screener')
def make_screener(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback']*21
v = context.params['min_volume']
# Set the volume filter - this is a builtin filter
volume_filter = average_volume_filter(lookback, v)
# compute past returns - using a builtin factor
roc_factor = technical_factor(lookback+5, roc, lookback)
pipe.add(roc_factor,'roc')
roc_filter = roc_factor > 0 # positive momentum only for long-only
# add the filtering
pipe.set_screen(roc_filter & volume_filter)
return pipe
def screener(context, data):
try:
pipeline_results = pipeline_output('my_screener')
except NoFurtherDataError:
log_info('no pipeline for {}'.format(get_datetime()))
return []
pipeline_results = pipeline_results.dropna() # remove NAs
selected = pipeline_results.sort_values(
'roc')[-(context.params['size']):] # pick up best N stocks
# return the asset list, note the pipeline result dataframe has
# assets (that survived the filters) as index and the computed
# factors as the columns
return selected.index.tolist()
def run_strategy(context, data):
# get screened assets to buy
assets = screener(context, data)
current_holdings = context.portfolio.positions.keys() # current holdings
exits = set(current_holdings) - set(assets) # assets to sell
for asset in exits:
# exit the exiting assets
order_target_percent(asset, 0)
if assets:
# rebalance the new list with equal weights
sizing = 1.0/len(assets)
for asset in assets:
order_target_percent(asset, sizing)
Example 2 - Add a low beta filter¶
In this example, we modify the pipeline building function to add a low beta filter. To do that, we define a custom factor that computes beta of stocks relative to the market index. Note, the function vectorized_beta takes in an array of dependent returns (numpy array) and a vector of dependent returns and compute the regression beta.
from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor
from blueshift.library.technicals.indicators import roc
from blueshift.pipeline import Pipeline
from blueshift.pipeline.factors.statistical import vectorized_beta
from zipline_pipeline.pipeline import CustomFactor
from blueshift.pipeline.data import EquityPricing
import warnings
import numpy as np
from blueshift.errors import NoFurtherDataError
from blueshift.api import order_target_percent, schedule_function, log_info
from blueshift.api import symbol, date_rules, time_rules, get_datetime
from blueshift.api import attach_pipeline, pipeline_output
def compute_beta(context, asset, lookback):
class SimpleBeta(CustomFactor):
inputs = [EquityPricing.close]
def compute(self,today,assets,out,close):
n = len(close)
nbars = n + 10
px = context.data_portal.history(asset,'close',nbars,'1d',dt=today)
px = px.iloc[-n:].values
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
independent = np.diff(px)/px[:(n-1)]
independent = independent.reshape(n-1,1)
dependent = np.diff(close, axis=0)/close[:(n-1)]
out[:] = vectorized_beta(dependent, independent, int(n/4))
return SimpleBeta(window_length = lookback)
def initialize(context):
# The context variables can be accessed by other methods
context.params = {'lookback':3,
'size':5,
'min_volume':1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(run_strategy, date_rules.month_start(),
time_rules.market_close(minutes=30))
# Set up the pipe-lines for strategies
attach_pipeline(make_screener(context), name='my_screener')
def make_screener(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback']*21
v = context.params['min_volume']
# Set the volume filter - this is a builtin filter
volume_filter = average_volume_filter(lookback, v)
# add a beta filter
market = symbol('NIFTY')
beta = compute_beta(context, market, 60)
# compute past returns - using a builtin factor
roc_factor = technical_factor(lookback+5, roc, lookback)
pipe.add(roc_factor,'roc')
pipe.add(beta,'beta')
roc_filter = roc_factor > 0 # positive momentum only for long-only
beta_filter = (beta > -0.5) & (beta < 0.5) # select a low beta filter
# add the filtering
pipe.set_screen(roc_filter & volume_filter & beta_filter)
return pipe
def screener(context, data):
try:
pipeline_results = pipeline_output('my_screener')
except NoFurtherDataError:
log_info('no pipeline for {}'.format(get_datetime()))
return []
pipeline_results = pipeline_results.dropna() # remove NAs
selected = pipeline_results.sort_values(
'roc')[-(context.params['size']):] # pick up best N stocks
# return the asset list, note the pipeline result dataframe has
# assets (that survived the filters) as index and the computed
# factors as the columns
return selected.index.tolist()
def run_strategy(context, data):
# get screened assets to buy
assets = screener(context, data)
current_holdings = context.portfolio.positions.keys() # current holdings
exits = set(current_holdings) - set(assets) # assets to sell
for asset in exits:
# exit the exiting assets
order_target_percent(asset, 0)
if assets:
# rebalance the new list with equal weights
sizing = 1.0/len(assets)
for asset in assets:
order_target_percent(asset, sizing)
Example 3 - Add a fundamental filter¶
In this examples, we add some fundamental filters based on operating margin, avoiding stocks with very high or very low margin which may mean unstable businesses not suitable for quant strategies.
from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor
from blueshift.library.technicals.indicators import roc
from blueshift.pipeline import Pipeline
from blueshift.pipeline.factors.statistical import vectorized_beta
from zipline_pipeline.pipeline import CustomFactor
from blueshift.pipeline.data import EquityPricing
from blueshift.pipeline.data import QuarterlyFundamental, EquitySector
import warnings
import numpy as np
from blueshift.errors import NoFurtherDataError
from blueshift.api import order_target_percent, schedule_function, log_info
from blueshift.api import symbol, date_rules, time_rules, get_datetime
from blueshift.api import attach_pipeline, pipeline_output
def compute_beta(context, asset, lookback):
class SimpleBeta(CustomFactor):
inputs = [EquityPricing.close]
def compute(self,today,assets,out,close):
n = len(close)
nbars = n + 10
px = context.data_portal.history(asset,'close',nbars,'1d',dt=today)
px = px.iloc[-n:].values
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
independent = np.diff(px)/px[:(n-1)]
independent = independent.reshape(n-1,1)
dependent = np.diff(close, axis=0)/close[:(n-1)]
out[:] = vectorized_beta(dependent, independent, int(n/4))
return SimpleBeta(window_length = lookback)
def initialize(context):
# The context variables can be accessed by other methods
context.params = {'lookback':3,
'size':5,
'min_volume':1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(run_strategy, date_rules.month_start(),
time_rules.market_close(minutes=30))
# Set up the pipe-lines for strategies
attach_pipeline(make_screener(context), name='my_screener')
def make_screener(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback']*21
v = context.params['min_volume']
# Set the volume filter - this is a builtin filter
volume_filter = average_volume_filter(lookback, v)
# add a beta filter
market = symbol('NIFTY')
beta = compute_beta(context, market, 60)
# compute past returns - using a builtin factor
roc_factor = technical_factor(lookback+5, roc, lookback)
pipe.add(roc_factor,'roc')
pipe.add(beta,'beta')
roc_filter = roc_factor > 0 # positive momentum only for long-only
beta_filter = (beta > -0.7) & (beta < 0.7) # select a low beta filter
# add some fundamental filters
ebitda = QuarterlyFundamental.ebitda.latest
sales = QuarterlyFundamental.operating_income.latest
margin = ebitda/sales
# sector filter is disabled
#sector = EquitySector.sector.latest
profitability1 = margin > 0.05
profitability2 = margin < 0.5
# sector filter is disabled
# sector_filter = sector.eq('Pharmaceuticals & Biotechnology')
# add the filtering
pipe.set_screen(roc_filter & volume_filter & beta_filter & profitability1 & profitability2)
return pipe
def screener(context, data):
try:
pipeline_results = pipeline_output('my_screener')
except NoFurtherDataError:
log_info('no pipeline for {}'.format(get_datetime()))
return []
pipeline_results = pipeline_results.dropna() # remove NAs
selected = pipeline_results.sort_values(
'roc')[-(context.params['size']):] # pick up best N stocks
# return the asset list, note the pipeline result dataframe has
# assets (that survived the filters) as index and the computed
# factors as the columns
return selected.index.tolist()
def run_strategy(context, data):
# get screened assets to buy
assets = screener(context, data)
current_holdings = context.portfolio.positions.keys() # current holdings
exits = set(current_holdings) - set(assets) # assets to sell
for asset in exits:
# exit the exiting assets
order_target_percent(asset, 0)
if assets:
# rebalance the new list with equal weights
sizing = 1.0/len(assets)
for asset in assets:
order_target_percent(asset, sizing)
These examples show the potential of this very powerful API for large universe strategies. For more examples, check out the Jupyter notebook section.