Pipeline APIs - Large Universe ============================== Quantitative investing approaches typically involve tracking a large universe of instruments and computing our trading signals on each one of them. On Blueshift, we use a special construct and a special set of APIs - known as the ``pipeline`` APIs to do this efficiently. A pipeline can be thought of as a computation graph, defined by a particular combination of some operations called ``factors`` and ``filters``. A factor is an operation that takes in some defined inputs and returns a numerical value. A filter is similar to a factor, but returns a True or False (boolean) value instead of a number. The advantage of using a computation graph to tackle a repeated computation for a large universe is that computation can be lazy and carried out in chunks. This means, when we create a pipeline or define a factor or a filter, we do not do any computation. We just define how to carry our the computation. The actual computation is done when we query the pipeline for results. Pipeline In Strategies ---------------------- Using the pipeline APIs in a Blueshift strategy involves three steps. First, we need to define the pipeline object. This involves instantiating a pipeline (the computation graph) and then adding factors and filters to it. Second, we need to attach the pipeline object to the current strategy by a name so that we can refer to it later (and also so that the strategy can run and cache the results efficiently). And finally, when we need the result, we ask for it. Below is a sample that involves all of these steps. .. code-block:: python from blueshift.api import schedule_function, date_rules, time_rules from blueshift.api import order_target, attach_pipeline, pipeline_output from blueshift.pipeline import Pipeline from blueshift.library.pipelines import period_returns, select_universe def make_pipeline(context): # create the pipelines and defines the factors and filters pipe = Pipeline() monthly_rets = period_returns(20, offset=0) liquidity = select_universe(200, 100) pipe.add(monthly_rets,'momentum') pipe.set_screen(liquidity) return pipe def initialize(context): schedule_function(strategy, date_rules.month_start(), time_rules.at('10:00')) # attach the pipeline to the strategy attach_pipeline(make_pipeline(context), 'strategy_pipe') def strategy(context, data): try: # request for the pipeline results results = pipeline_output('strategy_pipe') except Exception as e: print(f'error in getting pipeline output:{str(e)}') print(f'number of assets {len(results.index)}') In the above examples, we select the top 100 stocks based on 200-day daily average volume and compute the returns over last 20days for each one of them. Building the pipeline object +++++++++++++++++++++++++++++ To do that, first we define the pipeline. We import the ``Pipeline`` class from `blueshift.pipeline` and use it to create an empty object in the ``make_pipeline`` function. Then we use a readymade factor (``period_returns``) and a readymade filter (``select_universe``), both imported from the ``blueshift.library.pipelines`` package, to achieve our goal. The `select_universe` takes in a `lookback` period and a `size` parameter to filter the whole asset universe based on average dollar volume (averaged over the lookback period) and returns the top `size` count of assets. In our case, this is the top 100 stocks based on 200-day average dollar volume. The `period_returns` takes in a `lookback` parameter and an `offset` parameter and computes price returns between today-offset and today-offset-lookback days. With offset=0 and lookback=20, it gives us 20day returns starting from today. Note the above functions - `period_returns` and `select_universe` define operations and does not do any calculations (not yet). These operations are then added to the empty pipeline object. The factor operation is added using the ``add`` method of the pipeline object and the filter operation is added using the ``set_screen`` method. To add multiple factors, we can use the `add` method as many times as needed. For adding multiple filters, we can use the "&" and "|" operators to logically combine them. Finally, we return the pipeline object from the ``make_pipeline`` function. This object is used in the ``attach_pipeline`` API function to register the pipeline with the strategy, using a name we choose (`strategy_pipe` in this case). Fetching pipeline results ++++++++++++++++++++++++++ Once the pipeline is defined and registered with the strategy, we can call the ``pipeline_output`` API function to get the current output from the pipeline we have defined. In this case, it will return the 20-day returns for top 100 stocks for the day the result is requested. .. important :: All pipeline functionalities on Blueshift are based on daily data. It does not make sense, therefore, to call the pipeline computation (pipeline_output, see below) more than once a day. Also, requesting result from a pipeline on day t will carry out all computation based on data till previous day (t-1), to ensure no look-ahead bias. The return value from the ``pipeline_output`` is always a dataframe. The index of the dataframe is all the assets that has passed the filters we have defined (using ``set_screen``, see above) to the pipeline object, and columns of the dataframe are the factors we have added (using the ``add`` method, see above). If we do not add any filters, all assets in the universe are chosen (i.e. will be in the index of the result). If we do not add any factors, the dataframe itself will be empty. The column name for each factor added will be the name passed in the ``add`` method of the pipeline object while defining it. Some useful filters and factors -------------------------------- The blueshift library has some useful filters and factors readily available. .. autofunction:: blueshift.library.pipelines.select_universe :noindex: .. autofunction:: blueshift.library.pipelines.average_volume_filter :noindex: .. autofunction:: blueshift.library.pipelines.period_returns :noindex: .. autofunction:: blueshift.library.pipelines.technical_factor :noindex: These are usually the most commonly used factors and are conveniently wrapped in a function call. Besides these, there are many built-in factors and filters that you can directly import and instantiate or you can create your own custom filters or factors Custom Filter and Factors -------------------------- To use custom filter or factors, we need to define a new class (based on the respective base classes) and implement the ``compute`` method of the class, as show below .. code-block:: python from blueshift.pipeline import Pipeline, CustomFilter, CustomFactor from blueshift.pipeline.data import EquityPricing class CloseAboveHighFilter(CustomFilter): inputs = [EquityPricing.close, EquityPricing.high] def compute(self,today,assets,out,close_price, high_price): out[:] = (close_price[-1] > high_price[-2]) class CloseAboveHighFactor(CustomFilter): inputs = [EquityPricing.close, EquityPricing.high] def compute(self,today,assets,out,close_price, high_price): out[:] = (close_price[-1] - high_price[-2]) def make_pipeline(context): pipe = Pipeline() myfilter = CloseAboveHighFilter(window_length=10) myfactor = CloseAboveHighFactor(window_length=10) pipe.add(myfactor,'close_high_diff') pipe.set_screen(myfilter) return pipe In the above code snippet, we use the same computation, a comparison of last close vs high price 2 days before. We define a filter and a factor based on this to highlight their implementation and differences. For the custom filter, we define the class `CloseAboveHighFilter` based on the ``CustomFilter`` class. We also import ``EquityPricing`` to define the pricing columns to be used in the class. The ``inputs`` lists the required field(s) to be used. The ``compute`` method gets passed automatically a few variables - namely 1) `today`: this is the date of computation, 2) `assets`: list of all assets objects alive on that date, 3) `out`: the output numpy array to hold the results of the computation (this is a 1D array of size equal to the number of assets). The `compute` function also gets passed additional columns that we define in the `inputs` field (in the order they are defined). This is exactly the same for both the filter and factor implementation. The only difference is that in filter implementation, the results are boolean and in factors, they are numbers. Once we have defined the classes, we can use them in building the pipeline, by instantiating them with a window length. The window length determines the size of data passed to the compute function extra (pricing) fields. For example, in this case, if we assume at the computation date, there were total 2000 assets available to trade, the `close_price` and the `high_price` arguments will be a numpy array of shape (2000,10) Pipeline Examples ------------------- Below are a few concrete examples of pipeline based strategies. The first is a simple timeseries momentum strategy based on returns over last 3 months. The second example add a low beta filter on the same strategy, and finally the third one add some fundamentals filtering to improve the outcome. Note the general flow: we define the strategy (and any custom filters/ factors if required), and then attach the pipeline object to the strategy with a name using the `attach_pipeline` API method. Later we query the output for a specific day using the `pipeline_output` API method. Example 1 - Simple Timeseries Momentum +++++++++++++++++++++++++++++++++++++++ This strategy picks up the best 5 stocks every month, based on return overs last 3 months and allocate capital equally. The filters used are a liquidity filter (dollar traded volume > 10M) and positive returns (since it is a long only strategy). .. code-block:: python from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor from blueshift.library.technicals.indicators import roc from blueshift.pipeline import Pipeline from blueshift.errors import NoFurtherDataError from blueshift.api import order_target_percent, schedule_function, log_info from blueshift.api import date_rules, time_rules, get_datetime from blueshift.api import attach_pipeline, pipeline_output def initialize(context): # The context variables can be accessed by other methods context.params = {'lookback':3, 'size':5, 'min_volume':1E7 } # Call rebalance function on the first trading day of each month schedule_function(run_strategy, date_rules.month_start(), time_rules.market_close(minutes=30)) # Set up the pipe-lines for strategies attach_pipeline(make_screener(context), name='my_screener') def make_screener(context): pipe = Pipeline() # get the strategy parameters lookback = context.params['lookback']*21 v = context.params['min_volume'] # Set the volume filter - this is a builtin filter volume_filter = average_volume_filter(lookback, v) # compute past returns - using a builtin factor roc_factor = technical_factor(lookback+5, roc, lookback) pipe.add(roc_factor,'roc') roc_filter = roc_factor > 0 # positive momentum only for long-only # add the filtering pipe.set_screen(roc_filter & volume_filter) return pipe def screener(context, data): try: pipeline_results = pipeline_output('my_screener') except NoFurtherDataError: log_info('no pipeline for {}'.format(get_datetime())) return [] pipeline_results = pipeline_results.dropna() # remove NAs selected = pipeline_results.sort_values( 'roc')[-(context.params['size']):] # pick up best N stocks # return the asset list, note the pipeline result dataframe has # assets (that survived the filters) as index and the computed # factors as the columns return selected.index.tolist() def run_strategy(context, data): # get screened assets to buy assets = screener(context, data) current_holdings = context.portfolio.positions.keys() # current holdings exits = set(current_holdings) - set(assets) # assets to sell for asset in exits: # exit the exiting assets order_target_percent(asset, 0) if assets: # rebalance the new list with equal weights sizing = 1.0/len(assets) for asset in assets: order_target_percent(asset, sizing) Example 2 - Add a low beta filter +++++++++++++++++++++++++++++++++++ In this example, we modify the pipeline building function to add a low beta filter. To do that, we define a custom factor that computes beta of stocks relative to the market index. Note, the function `vectorized_beta` takes in an array of dependent returns (numpy array) and a vector of dependent returns and compute the regression beta. .. code-block:: python from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor from blueshift.library.technicals.indicators import roc from blueshift.pipeline import Pipeline from blueshift.pipeline.factors.statistical import vectorized_beta from zipline_pipeline.pipeline import CustomFactor from blueshift.pipeline.data import EquityPricing import warnings import numpy as np from blueshift.errors import NoFurtherDataError from blueshift.api import order_target_percent, schedule_function, log_info from blueshift.api import symbol, date_rules, time_rules, get_datetime from blueshift.api import attach_pipeline, pipeline_output def compute_beta(context, asset, lookback): class SimpleBeta(CustomFactor): inputs = [EquityPricing.close] def compute(self,today,assets,out,close): n = len(close) nbars = n + 10 px = context.data_portal.history(asset,'close',nbars,'1d',dt=today) px = px.iloc[-n:].values with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) independent = np.diff(px)/px[:(n-1)] independent = independent.reshape(n-1,1) dependent = np.diff(close, axis=0)/close[:(n-1)] out[:] = vectorized_beta(dependent, independent, int(n/4)) return SimpleBeta(window_length = lookback) def initialize(context): # The context variables can be accessed by other methods context.params = {'lookback':3, 'size':5, 'min_volume':1E7 } # Call rebalance function on the first trading day of each month schedule_function(run_strategy, date_rules.month_start(), time_rules.market_close(minutes=30)) # Set up the pipe-lines for strategies attach_pipeline(make_screener(context), name='my_screener') def make_screener(context): pipe = Pipeline() # get the strategy parameters lookback = context.params['lookback']*21 v = context.params['min_volume'] # Set the volume filter - this is a builtin filter volume_filter = average_volume_filter(lookback, v) # add a beta filter market = symbol('NIFTY') beta = compute_beta(context, market, 60) # compute past returns - using a builtin factor roc_factor = technical_factor(lookback+5, roc, lookback) pipe.add(roc_factor,'roc') pipe.add(beta,'beta') roc_filter = roc_factor > 0 # positive momentum only for long-only beta_filter = (beta > -0.5) & (beta < 0.5) # select a low beta filter # add the filtering pipe.set_screen(roc_filter & volume_filter & beta_filter) return pipe def screener(context, data): try: pipeline_results = pipeline_output('my_screener') except NoFurtherDataError: log_info('no pipeline for {}'.format(get_datetime())) return [] pipeline_results = pipeline_results.dropna() # remove NAs selected = pipeline_results.sort_values( 'roc')[-(context.params['size']):] # pick up best N stocks # return the asset list, note the pipeline result dataframe has # assets (that survived the filters) as index and the computed # factors as the columns return selected.index.tolist() def run_strategy(context, data): # get screened assets to buy assets = screener(context, data) current_holdings = context.portfolio.positions.keys() # current holdings exits = set(current_holdings) - set(assets) # assets to sell for asset in exits: # exit the exiting assets order_target_percent(asset, 0) if assets: # rebalance the new list with equal weights sizing = 1.0/len(assets) for asset in assets: order_target_percent(asset, sizing) Example 3 - Add a fundamental filter ++++++++++++++++++++++++++++++++++++++ In this examples, we add some fundamental filters based on operating margin, avoiding stocks with very high or very low margin which may mean unstable businesses not suitable for quant strategies. .. code-block:: python from blueshift.library.pipelines.pipelines import average_volume_filter, technical_factor from blueshift.library.technicals.indicators import roc from blueshift.pipeline import Pipeline from blueshift.pipeline.factors.statistical import vectorized_beta from zipline_pipeline.pipeline import CustomFactor from blueshift.pipeline.data import EquityPricing from blueshift.pipeline.data import QuarterlyFundamental, EquitySector import warnings import numpy as np from blueshift.errors import NoFurtherDataError from blueshift.api import order_target_percent, schedule_function, log_info from blueshift.api import symbol, date_rules, time_rules, get_datetime from blueshift.api import attach_pipeline, pipeline_output def compute_beta(context, asset, lookback): class SimpleBeta(CustomFactor): inputs = [EquityPricing.close] def compute(self,today,assets,out,close): n = len(close) nbars = n + 10 px = context.data_portal.history(asset,'close',nbars,'1d',dt=today) px = px.iloc[-n:].values with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) independent = np.diff(px)/px[:(n-1)] independent = independent.reshape(n-1,1) dependent = np.diff(close, axis=0)/close[:(n-1)] out[:] = vectorized_beta(dependent, independent, int(n/4)) return SimpleBeta(window_length = lookback) def initialize(context): # The context variables can be accessed by other methods context.params = {'lookback':3, 'size':5, 'min_volume':1E7 } # Call rebalance function on the first trading day of each month schedule_function(run_strategy, date_rules.month_start(), time_rules.market_close(minutes=30)) # Set up the pipe-lines for strategies attach_pipeline(make_screener(context), name='my_screener') def make_screener(context): pipe = Pipeline() # get the strategy parameters lookback = context.params['lookback']*21 v = context.params['min_volume'] # Set the volume filter - this is a builtin filter volume_filter = average_volume_filter(lookback, v) # add a beta filter market = symbol('NIFTY') beta = compute_beta(context, market, 60) # compute past returns - using a builtin factor roc_factor = technical_factor(lookback+5, roc, lookback) pipe.add(roc_factor,'roc') pipe.add(beta,'beta') roc_filter = roc_factor > 0 # positive momentum only for long-only beta_filter = (beta > -0.7) & (beta < 0.7) # select a low beta filter # add some fundamental filters ebitda = QuarterlyFundamental.ebitda.latest sales = QuarterlyFundamental.operating_income.latest margin = ebitda/sales # sector filter is disabled #sector = EquitySector.sector.latest profitability1 = margin > 0.05 profitability2 = margin < 0.5 # sector filter is disabled # sector_filter = sector.eq('Pharmaceuticals & Biotechnology') # add the filtering pipe.set_screen(roc_filter & volume_filter & beta_filter & profitability1 & profitability2) return pipe def screener(context, data): try: pipeline_results = pipeline_output('my_screener') except NoFurtherDataError: log_info('no pipeline for {}'.format(get_datetime())) return [] pipeline_results = pipeline_results.dropna() # remove NAs selected = pipeline_results.sort_values( 'roc')[-(context.params['size']):] # pick up best N stocks # return the asset list, note the pipeline result dataframe has # assets (that survived the filters) as index and the computed # factors as the columns return selected.index.tolist() def run_strategy(context, data): # get screened assets to buy assets = screener(context, data) current_holdings = context.portfolio.positions.keys() # current holdings exits = set(current_holdings) - set(assets) # assets to sell for asset in exits: # exit the exiting assets order_target_percent(asset, 0) if assets: # rebalance the new list with equal weights sizing = 1.0/len(assets) for asset in assets: order_target_percent(asset, sizing) These examples show the potential of this very powerful API for large universe strategies. For more examples, check out the Jupyter notebook section.