Handling Large Universe on Blueshift娦
A typical quant trading problem¶
As we have seen before, quant trading typically involves tracking a large universe of instruments and computing our trading signals on each one of them. We use a special construct and a special set of API functions on Blueshift® to do this efficiently.
Pipelines on Blueshift¶
We use what is called pipelines for such cases. A pipeline can be
thought of as a
computation graph, defined by a particular
combination of some operations called factors and filters. A factor is an operation
that takes in some defined inputs and returns a numerical value. A filter
is similar to a factor, but returns a True or False (boolean) value instead
of a number. The advantage of using a computation graph to tackle a
repeated computation for a large universe is that computation can be
lazy
and carried out in chunks. This means, when we create a pipeline
or define a factor or a filter, we do not do any computation. We
just define how to carry our the computation. The actual computation is
done when we query the pipeline for results.
Info
All pipeline functionalities on Blueshift® are based on
daily data. It does not make sense, therefore, to call the pipeline
computation (pipeline_output, see below) more than once a day. The
implementation is based on zipline Pipeline.
Defining factors or filters¶
These are the building blocks of our computation graph, i.e. pipeline.
We do have some readymade factors and filters available, but oftentimes, we have to define our own. To see how it can be done, let
scrutinize the code snippet below.
import numpy as np
from blueshift.pipeline import CustomFilter
from blueshift.pipeline.data import EquityPricing
def average_volume_filter(lookback, amount):
class AvgDailyDollarVolumeTraded(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.volume]
def compute(self,today,assets,out,close_price,volume):
dollar_volume = np.mean(close_price * volume, axis=0)
high_volume = dollar_volume > amount
out[:] = high_volume
return AvgDailyDollarVolumeTraded(window_length = lookback)
CustomFilter from the pipeline module and another
class called EquityPricing from pipeline.data module. The first one
provides us with an implementation to define a filter as described above. The
later provides us with the boiler plates to define what inputs columns
this filter will accept. EquityPricing implements the standard pricing
columns - namely, open, high, low, close and volume.
To define a custom filter, all we have to do is to re-define the function
compute as we see fit, to return the filtering that we wish. The function
compute is a member of the base class CustomFilter (as well as
CustomFactor in case we use that). Arguments to the compute function
are as follows (ignoring the self variable which is just the class
instance itself):
today:This is the date of the computation. This is automatically provided by the platform when we run thepipelineassets: This is the list of all assets that are available to trade on the day the computation is invoked (today), again provided by the platform. This will be anumpyarray of length N where N is the number of assets available as oftoday.out: A numpy array with size equal to the number of assets inasset(N). This is where we store the results of the computation - one value ( number of boolean) for each assets. We can add many factors in apipeline. Thepipelinewill combine all of them in to a multi-column array, a column each for the factors, corresponding to itsoutvalues, indexed by assets. When we query thispipelinefrom our strategy for a factor, the corresponding column is automatically returned.
The rest of the variables depend on what we want. In this example above,
we want the inputs (which is a class variable) to have the columns close
and volume from the standard pricing data. So the last two variables in
the compute function will refer to them (we happen to name them as
close_price and volume, but we could have given any names). These are
also numpy arrays. The row length of the pricing data in these
specified columns will be equal to the window_length variable of the
class instance. This represents the number of bars (always daily) we
will use to do our computation. The number of columns will be equal to the number of
assets available on today.
In the main body of the compute function, we calculate the average daily
volume for each asset, and use a particular threshold amount to return
a True or False value - signifying if the volume is high enough.
The wrapper function¶
In the code snippet above, we implement a custom filter to compute
average daily volume (> amount) - by defining a class AvgDailyDollarVolumeTraded
which inherits from the base filter implementation CustomFilter. The
top level function wraps this class definition with the arguments (i.e.
lookback that we set as the window_length and amount that we filter
on). It also conveniently creates an instance of the class and returns the
same.
Custom factors¶
These will be very similar to the filter above, just the returned value
will be numerical and we have to derive the custom class from CustomFactor
and not CustomFilter. Next section
has an example.
Building the pipeline¶
The above code snippet defines the operation of the filter. To use this,
we need to define the computation graph, i.e. the pipeline itself.
This can typically be done by the following code snippet:
from blueshift.pipeline import Pipeline
...
def make_strategy_pipeline(context):
pipe = Pipeline()
# get the strategy parameters defined in initialize
lookback = context.params['lookback']
amount = context.params['min_volume']
# Set the volume filter
volume_filter = average_volume_filter(lookback, amount)
pipe.add(volume_filter,'high_volume')
return pipe
pipeline using the constructor Pipeline.
Then we invoke our function average_volume_filter just defined above.
Finally we add this operation to the pipeline and returns it. When we
query the pipeline later, the results are dynamically computed and
returned under the column high_volume (passed as the second argument
to the add method).
The pipeline construction function, like make_strategy_pipeline above
is typically called once in the initialize function. However, it is
not sufficient to build the pipeline. We also need to attach the
pipeline to the current strategy. All this is done using the
attach_pipeline API function as below:
from blueshift.api import attach_pipeline
def initialize(context):
... # other related code
# Set up the pipe-lines for strategies
attach_pipeline(make_strategy_pipeline(context),
name='strategy_pipeline')
pipeline instance we built. The second argument
strategy_pipeline is the name we can refer to it by later in our
strategy code as shown below.
Querying the pipeline¶
Once we have defined our filters and factors and built the pipeline
too, we are now ready to use it. This is typically done using the API
function pipeline_output as below:
from blueshift.api import pipeline_output
...
def handle_data(context, data):
pipeline_results = pipeline_output('strategy_pipeline')
print(pipeline_results.index)
strategy_pipeline. The returned
result pipeline_results is a numpy array, indexed by the assets and
have a column for each factors added during the pipeline construction,
using the add method. In the above example, it will have a single
column named high_volume. These calls to pipeline_output trigger
actual computation of the pipeline.
Combining factors¶
We can add multiple factors using the add method during the pipeline
construction, each becoming a separate column. But many times we may
need to combine factors in a single column. We can do that by using
standard arithmetic operators, as in the code below:
...
factor1 = first_factor(some_params)
factor2 = second_factor(other_params)
combined_factor = factor1 + factor1
factor1 and factor2 defines the factors, i.e. some
operations. The combined factor combined_factor is also a factor, that
is defined as combination of the others. Note, we do not carry out any
computations here. The + operator is NOT operating on numbers, but on
instances of type CustomFactor. The platform has the standard arithmatic
operators overloaded (including +), so that we can combine factors
in this way. The actual computation is done when we query the pipeline.
For filters, we use the & (and), | (or) and ! (not) operator for
combining. This is natural as they are boolean operations by design.
...
filter1 = first_filter(some_params)
filter2 = second_filter(other_params)
combined_filter = filter1 & filter2
Combining factor and filter¶
Finally, when we need to combine both a factor and a filter, we used
the add method using the factor and use set_screen method to add
the filter, as below
...
pipe = Pipeline()
filter1 = my_filter(some_params)
factor = my_factor(other_params)
pipe.add(factor, "some_factor_name")
pipe.set_screen(filter1)
Strategy example using pipeline¶
A simple strategy example using pipeline is available in the default
templates when you create a strategy - under the name SMA Pipeline.
For a more advanced example, continue to the next section.