Pipeline APIs - Large Universe ============================== Quantitative investing approaches typically involve tracking a large universe of instruments and computing our trading signals on each one of them. On Blueshift, we use a special construct and a special set of APIs - known as the ``pipeline`` APIs to do this efficiently. A pipeline can be thought of as a computation graph, defined by a particular combination of some operations called ``factors`` and ``filters``. A factor is an operation that takes in some defined inputs and returns a numerical value. A filter is similar to a factor, but returns a True or False (boolean) value instead of a number. The advantage of using a computation graph to tackle a repeated computation for a large universe is that computation can be lazy and carried out in chunks. This means, when we create a pipeline or define a factor or a filter, we do not do any computation. We just define how to carry our the computation. The actual computation is done when we query the pipeline for results. Pipeline In Strategies ---------------------- Using the pipeline APIs in a Blueshift strategy involves three steps. First, we need to define the pipeline object. This involves instantiating a pipeline (the computation graph) and then adding factors and filters to it. Second, we need to attach the pipeline object to the current strategy by a name so that we can refer to it later (and also so that the strategy can run and cache the results efficiently). And finally, when we need the result, we ask for it. Below is a sample that involves all of these steps. .. code-block:: python from blueshift.api import schedule_function, date_rules, time_rules from blueshift.api import order_target, attach_pipeline, pipeline_output from blueshift.pipeline import Pipeline from blueshift.library.pipelines import period_returns, select_universe def make_pipeline(context): # create the pipelines and defines the factors and filters pipe = Pipeline() monthly_rets = period_returns(20, offset=0) liquidity = select_universe(200, 100) pipe.add(monthly_rets,'momentum') pipe.set_screen(liquidity) return pipe def initialize(context): schedule_function(strategy, date_rules.month_start(), time_rules.at('10:00')) # attach the pipeline to the strategy attach_pipeline(make_pipeline(context), 'strategy_pipe') def strategy(context, data): try: # request for the pipeline results results = pipeline_output('strategy_pipe') except Exception as e: print(f'error in getting pipeline output:{str(e)}') print(f'number of assets {len(results.index)}') In the above examples, we select the top 100 stocks based on 200-day daily average volume and compute the returns over last 20days for each one of them. Building the pipeline object +++++++++++++++++++++++++++++ To do that, first we define the pipeline. We import the ``Pipeline`` class from `blueshift.pipeline` and use it to create an empty object in the ``make_pipeline`` function. Then we use a readymade factor (``period_returns``) and a readymade filter (``select_universe``), both imported from the ``blueshift.library.pipelines`` package, to achieve our goal. The `select_universe` takes in a `lookback` period and a `size` parameter to filter the whole asset universe based on average dollar volume (averaged over the lookback period) and returns the top `size` count of assets. In our case, this is the top 100 stocks based on 200-day average dollar volume. The `period_returns` takes in a `lookback` parameter and an `offset` parameter and computes price returns between today-offset and today-offset-lookback days. With offset=0 and lookback=20, it gives us 20day returns starting from today. Note the above functions - `period_returns` and `select_universe` define operations and does not do any calculations (not yet). These operations are then added to the empty pipeline object. The factor operation is added using the ``add`` method of the pipeline object and the filter operation is added using the ``set_screen`` method. To add multiple factors, we can use the `add` method as many times as needed. For adding multiple filters, we can use the "&" and "|" operators to logically combine them. Finally, we return the pipeline object from the ``make_pipeline`` function. This object is used in the ``attach_pipeline`` API function to register the pipeline with the strategy, using a name we choose (`strategy_pipe` in this case). Fetching pipeline results ++++++++++++++++++++++++++ Once the pipeline is defined and registered with the strategy, we can call the ``pipeline_output`` API function to get the current output from the pipeline we have defined. In this case, it will return the 20-day returns for top 100 stocks for the day the result is requested. .. important :: All pipeline functionalities on Blueshift are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. Also, requesting result from a pipeline on day t will carry out all computation based on data till previous day (t-1), to ensure no look-ahead bais. The return value from the ``pipeline_output`` is always a dataframe. The index of the dataframe is all the assets that has passed the filters we have defined (using ``set_screen``, see above) to the pipeline object, and columns of the dataframe are the factors we have added (using the ``add`` method, see above). If we do not add any filters, all assets in the universe are chosen (i.e. will be in the index of the result). If we do not add any factors, the dataframe itself will be empty. The column name for each factor added will be the name passed in the ``add`` method of the pipeline object while defining it. Some useful filters and factors -------------------------------- The blueshift library has some useful filters and factors readily available. .. autofunction:: blueshift.library.pipelines.select_universe :noindex: .. autofunction:: blueshift.library.pipelines.average_volume_filter :noindex: .. autofunction:: blueshift.library.pipelines.period_returns :noindex: .. autofunction:: blueshift.library.pipelines.technical_factor :noindex: These are usually the most commonly used factors and are conveniently wrapped in a function call. Besides these, there are many built-in factors and filters that you can directly import and instantiate or you can create your own custom filters or factors Custom Filter and Factors -------------------------- To use custom filter or factors, we need to define a new class (based on the respective base classes) and implement the ``compute`` method of the class, as show below .. code-block:: python from blueshift.pipeline import Pipeline, CustomFilter, CustomFactor from blueshift.pipeline.data import EquityPricing class CloseAboveHighFilter(CustomFilter): inputs = [EquityPricing.close, EquityPricing.high] def compute(self,today,assets,out,close_price, high_price): out[:] = (close_price[-1] > high_price[-2]) class CloseAboveHighFactor(CustomFilter): inputs = [EquityPricing.close, EquityPricing.high] def compute(self,today,assets,out,close_price, high_price): out[:] = (close_price[-1] - high_price[-2]) def make_pipeline(context): pipe = Pipeline() myfilter = CloseAboveHighFilter(window_length=10) myfactor = CloseAboveHighFactor(window_length=10) pipe.add(myfactor,'close_high_diff') pipe.set_screen(myfilter) return pipe In the above code snippet, we use the same computation, a comparison of last close vs high price 2 days before. We define a filter and a factor based on this to highlight their imlpementation and differences. For the custom filter, we define the class `CloseAboveHighFilter` based on the ``CustomFilter`` class. We also import ``EquityPricing`` to define the pricing columns to be used in the class. The ``inputs`` lists the required field(s) to be used. The ``compute`` method gets passed automatically a few variables - namely 1) `today`: this is the date of computation, 2) `assets`: list of all assets objects alive on that date, 3) `out`: the output numpy array to hold the results of the computation (this is a 1D array of size equal to the number of assets). The `compute` function also gets passed additional columns that we define in the `inputs` field (in the order they are defined). This is exactly the same for both the filter and factor implementation. The only difference is that in filter implementation, the results are boolean and in factors, they are numbers. Once we have defined the classes, we can use them in building the pipeline, by instatiating them with a window length. The window length determines the size of data passed to the compute function extra (pricing) fields. For example, in this case, if we assume at the computation date, there were total 2000 assets available to trade, the `close_price` and the `high_price` arguments will be a numpy array of shape (2000,10)