Handling Large Universe on Blueshift娦
A typical quant trading problem¶
As we have seen before, quant trading typically involves tracking a large universe of instruments and computing our trading signals on each one of them. We use a special construct and a special set of API functions on Blueshift® to do this efficiently.
Pipelines on Blueshift¶
We use what is called pipelines for such cases. A pipeline
can be
thought of as a
computation graph, defined by a particular
combination of some operations called factors
and filters
. A factor
is an operation
that takes in some defined inputs and returns a numerical value. A filter
is similar to a factor
, but returns a True
or False
(boolean
) value instead
of a number. The advantage of using a computation graph to tackle a
repeated computation for a large universe is that computation can be
lazy
and carried out in chunks
. This means, when we create a pipeline
or define a factor
or a filter
, we do not do any computation. We
just define how to carry our the computation. The actual computation is
done when we query the pipeline
for results.
Info
All pipeline
functionalities on Blueshift® are based on
daily data. It does not make sense, therefore, to call the pipeline
computation (pipeline_output
, see below) more than once a day. The
implementation is based on zipline Pipeline.
Defining factors or filters¶
These are the building blocks of our computation graph, i.e. pipeline
.
We do have some readymade factors
and filters
available, but oftentimes, we have to define our own. To see how it can be done, let
scrutinize the code snippet below.
import numpy as np
from blueshift.pipeline import CustomFilter
from blueshift.pipeline.data import EquityPricing
def average_volume_filter(lookback, amount):
class AvgDailyDollarVolumeTraded(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.volume]
def compute(self,today,assets,out,close_price,volume):
dollar_volume = np.mean(close_price * volume, axis=0)
high_volume = dollar_volume > amount
out[:] = high_volume
return AvgDailyDollarVolumeTraded(window_length = lookback)
CustomFilter
from the pipeline
module and another
class called EquityPricing
from pipeline.data
module. The first one
provides us with an implementation to define a filter
as described above. The
later provides us with the boiler plates to define what inputs
columns
this filter
will accept. EquityPricing
implements the standard pricing
columns - namely, open
, high
, low
, close
and volume
.
To define a custom filter
, all we have to do is to re-define the function
compute
as we see fit, to return the filtering that we wish. The function
compute
is a member of the base class CustomFilter
(as well as
CustomFactor
in case we use that). Arguments to the compute
function
are as follows (ignoring the self
variable which is just the class
instance itself):
today
:This is the date of the computation. This is automatically provided by the platform when we run thepipeline
assets
: This is the list of all assets that are available to trade on the day the computation is invoked (today
), again provided by the platform. This will be anumpy
array of length N where N is the number of assets available as oftoday
.out
: A numpy array with size equal to the number of assets inasset
(N). This is where we store the results of the computation - one value ( number of boolean) for each assets. We can add many factors in apipeline
. Thepipeline
will combine all of them in to a multi-column array, a column each for the factors, corresponding to itsout
values, indexed by assets. When we query thispipeline
from our strategy for a factor, the corresponding column is automatically returned.
The rest of the variables depend on what we want. In this example above,
we want the inputs
(which is a class variable) to have the columns close
and volume
from the standard pricing data. So the last two variables in
the compute
function will refer to them (we happen to name them as
close_price
and volume
, but we could have given any names). These are
also numpy
arrays. The row length of the pricing data in these
specified columns will be equal to the window_length
variable of the
class instance. This represents the number of bars (always daily
) we
will use to do our computation. The number of columns will be equal to the number of
assets available on today
.
In the main body of the compute
function, we calculate the average daily
volume for each asset, and use a particular threshold amount
to return
a True
or False
value - signifying if the volume is high enough.
The wrapper function¶
In the code snippet above, we implement a custom filter
to compute
average daily volume (> amount
) - by defining a class AvgDailyDollarVolumeTraded
which inherits from the base filter
implementation CustomFilter
. The
top level function wraps this class definition with the arguments (i.e.
lookback
that we set as the window_length
and amount
that we filter
on). It also conveniently creates an instance of the class and returns the
same.
Custom factors¶
These will be very similar to the filter
above, just the returned value
will be numerical and we have to derive the custom class from CustomFactor
and not CustomFilter
. Next section
has an example.
Building the pipeline¶
The above code snippet defines the operation of the filter
. To use this,
we need to define the computation graph, i.e. the pipeline
itself.
This can typically be done by the following code snippet:
from blueshift.pipeline import Pipeline
...
def make_strategy_pipeline(context):
pipe = Pipeline()
# get the strategy parameters defined in initialize
lookback = context.params['lookback']
amount = context.params['min_volume']
# Set the volume filter
volume_filter = average_volume_filter(lookback, amount)
pipe.add(volume_filter,'high_volume')
return pipe
pipeline
using the constructor Pipeline
.
Then we invoke our function average_volume_filter
just defined above.
Finally we add this operation to the pipeline and returns it. When we
query the pipeline
later, the results are dynamically computed and
returned under the column high_volume
(passed as the second argument
to the add
method).
The pipeline construction function, like make_strategy_pipeline
above
is typically called once in the initialize
function. However, it is
not sufficient to build the pipeline. We also need to attach
the
pipeline
to the current strategy. All this is done using the
attach_pipeline
API function as below:
from blueshift.api import attach_pipeline
def initialize(context):
... # other related code
# Set up the pipe-lines for strategies
attach_pipeline(make_strategy_pipeline(context),
name='strategy_pipeline')
pipeline
instance we built. The second argument
strategy_pipeline
is the name we can refer to it by later in our
strategy code as shown below.
Querying the pipeline¶
Once we have defined our filters
and factors
and built the pipeline
too, we are now ready to use it. This is typically done using the API
function pipeline_output
as below:
from blueshift.api import pipeline_output
...
def handle_data(context, data):
pipeline_results = pipeline_output('strategy_pipeline')
print(pipeline_results.index)
strategy_pipeline
. The returned
result pipeline_results
is a numpy
array, indexed by the assets and
have a column for each factors
added during the pipeline construction,
using the add
method. In the above example, it will have a single
column named high_volume
. These calls to pipeline_output
trigger
actual computation of the pipeline
.
Combining factors¶
We can add multiple factors using the add
method during the pipeline
construction, each becoming a separate column. But many times we may
need to combine factors in a single column. We can do that by using
standard arithmetic operators, as in the code below:
...
factor1 = first_factor(some_params)
factor2 = second_factor(other_params)
combined_factor = factor1 + factor1
factor1
and factor2
defines the factor
s, i.e. some
operations. The combined factor combined_factor
is also a factor, that
is defined as combination of the others. Note, we do not carry out any
computations here. The +
operator is NOT operating on numbers, but on
instances of type CustomFactor
. The platform has the standard arithmatic
operators overloaded (including +
), so that we can combine factors
in this way. The actual computation is done when we query the pipeline.
For filters, we use the &
(and), |
(or) and !
(not) operator for
combining. This is natural as they are boolean operations by design.
...
filter1 = first_filter(some_params)
filter2 = second_filter(other_params)
combined_filter = filter1 & filter2
Combining factor and filter¶
Finally, when we need to combine both a factor
and a filter
, we used
the add
method using the factor
and use set_screen
method to add
the filter
, as below
...
pipe = Pipeline()
filter1 = my_filter(some_params)
factor = my_factor(other_params)
pipe.add(factor, "some_factor_name")
pipe.set_screen(filter1)
Strategy example using pipeline¶
A simple strategy example using pipeline is available in the default
templates when you create a strategy - under the name SMA Pipeline
.
For a more advanced example, continue to the next section.