Skip to content

Commit

Permalink
Chore: Make release 1.0.74
Browse files Browse the repository at this point in the history
  • Loading branch information
martinroberson authored and Vanden Bon, David V [GBM Public] committed May 6, 2024
1 parent 44362ac commit b6f16ad
Show file tree
Hide file tree
Showing 40 changed files with 1,605 additions and 124 deletions.
4 changes: 3 additions & 1 deletion docs/classes/gs_quant.markets.position_set.PositionSet.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
gs\_quant.markets.position\_set.PositionSet
gs\_quant.markets.position\_set.PositionSet
===========================================

.. currentmodule:: gs_quant.markets.position_set
Expand Down Expand Up @@ -26,6 +26,8 @@ gs\_quant.markets.position\_set.PositionSet
~PositionSet.resolve
~PositionSet.to_frame
~PositionSet.to_target
~PositionSet.price_many
~PositionSet.resolve_many



Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
gs\_quant.markets.position\_set.PositionSet.price\_many
======================================================

.. currentmodule:: gs_quant.markets.position_set

.. automethod:: PositionSet.price_many
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
gs\_quant.markets.position\_set.PositionSet.resolve\_many
======================================================

.. currentmodule:: gs_quant.markets.position_set

.. automethod:: PositionSet.resolve_many
14 changes: 14 additions & 0 deletions gs_quant/api/gs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ def resolve_assets(
query = dict(where=where, limit=limit, fields=fields, asOfTime=as_of.strftime("%Y-%m-%dT%H:%M:%SZ"))
return GsSession.current._post('/positions/resolver', payload=query)

@classmethod
def get_many_asset_xrefs(
cls,
identifier: [str],
fields: IdList = [],
limit: int = 100,
as_of: dt.datetime = dt.datetime.today(),
**kwargs
) -> Tuple[dict, ...]:
where = dict(identifier=identifier, **kwargs)
query = dict(where=where, limit=limit, fields=fields, asOfTime=as_of.strftime("%Y-%m-%dT%H:%M:%SZ"))

return GsSession.current._post('/assets/xrefs/query', payload=query).get('results')

@classmethod
@_cached
def get_asset_xrefs(
Expand Down
18 changes: 12 additions & 6 deletions gs_quant/api/gs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,8 @@ def _get_market_data_filters(asset_ids: List[str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve'):
measure='Curve',
vendor: str = ''):
inner = {
'entityIds': asset_ids,
'queryType': query_type.value if isinstance(query_type, QueryType) else query_type,
Expand All @@ -723,6 +724,8 @@ def _get_market_data_filters(asset_ids: List[str],
measure
]
}
if vendor != '':
inner['vendor'] = vendor
return inner

@staticmethod
Expand All @@ -731,7 +734,8 @@ def build_interval_chunked_market_data_queries(asset_ids: List[str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve') -> List[dict]:
measure='Curve',
vendor: str = '') -> List[dict]:
parallel_interval = 365 # chunk over a year

def chunk_time(start, end) -> tuple:
Expand All @@ -751,7 +755,8 @@ def chunk_time(start, end) -> tuple:
start_key, end_key = 'startDate', 'endDate'

for s, e in chunk_time(start, end):
inner = copy(GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure))
inner = copy(GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure,
vendor))
inner[start_key], inner[end_key] = s, e
queries.append({
'queries': [inner]
Expand All @@ -768,12 +773,13 @@ def build_market_data_query(asset_ids: List[str],
source: Union[str] = None,
real_time: bool = False,
measure='Curve',
parallelize_queries: bool = False) -> Union[dict, List[dict]]:
parallelize_queries: bool = False,
vendor: str = '') -> Union[dict, List[dict]]:
if parallelize_queries:
return GsDataApi.build_interval_chunked_market_data_queries(asset_ids, query_type, where, source, real_time,
measure)
measure, vendor)

inner = GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure)
inner = GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure, vendor)
if DataContext.current.interval is not None:
inner['interval'] = DataContext.current.interval
if real_time:
Expand Down
12 changes: 12 additions & 0 deletions gs_quant/api/gs/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from gs_quant.session import GsSession
from gs_quant.target.price import *
from gs_quant.target.positions_v2_pricing import *


class GsPriceApi:
Expand All @@ -25,3 +26,14 @@ class GsPriceApi:
def price_positions(cls, inputs: PositionSetPriceInput) -> PositionSetPriceResponse:
url = '/price/positions'
return GsSession.current._post(url, payload=inputs, cls=PositionSetPriceResponse)

@classmethod
def price_many_positions(cls, pricing_request: PositionsPricingRequest) -> dict:
url = '/positions/price/bulk'
GsSession.current.api_version = "v2"
pricing_response = GsSession.current._post(url, payload=pricing_request)
GsSession.current.api_version = "v1"

positions = pricing_response.get("positions")

return positions
4 changes: 3 additions & 1 deletion gs_quant/api/gs/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ async def handle_websocket():
# New results have been received
request_id = None
try:
request_id, status_result_str = result_listener.result().split(';', 1)
raw_res = result_listener.result()
parsed_res = raw_res.decode() if isinstance(raw_res, bytes) else raw_res
request_id, status_result_str = parsed_res.split(';', 1)
status, result_str = status_result_str[0], status_result_str[1:]
except Exception as ee:
status = 'E'
Expand Down
58 changes: 53 additions & 5 deletions gs_quant/api/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,71 @@
from abc import ABCMeta, abstractmethod
from concurrent.futures import TimeoutError
from threading import Thread
from typing import Iterable, Optional, Union, Tuple
from typing import Iterable, Optional, Union, Tuple, Dict, Any

from opentracing import Span
from tqdm import tqdm

from gs_quant.api.api_session import ApiWithCustomSession
from gs_quant.base import RiskKey, Sentinel
from gs_quant.base import RiskKey, Sentinel, Priceable
from gs_quant.risk import ErrorValue, RiskRequest
from gs_quant.risk.result_handlers import result_handlers
from gs_quant.risk.results import PricingFuture
from gs_quant.session import GsSession
from gs_quant.tracing import Tracer

_logger = logging.getLogger(__name__)


class RiskApi(ApiWithCustomSession, metaclass=ABCMeta):
class GenericRiskApi(ApiWithCustomSession, metaclass=ABCMeta):
batch_dates = True

@classmethod
@abstractmethod
def populate_pending_futures(cls, requests: list, session: GsSession,
pending: Dict[Tuple[RiskKey, Priceable], PricingFuture]):
...

@classmethod
@abstractmethod
def build_keyed_results(cls, request: RiskRequest, results: Union[Iterable, Exception]) -> \
Dict[Tuple[RiskKey, Priceable], Any]:
...


class RiskApi(GenericRiskApi, metaclass=ABCMeta):
__SHUTDOWN_SENTINEL = Sentinel('QueueListenerShutdown')

@classmethod
def populate_pending_futures(cls, requests: list, session: GsSession,
pending: Dict[Tuple[RiskKey, Priceable], PricingFuture], **kwargs):
results = queue.Queue()
done = False
max_concurrent, progress_bar, timeout, span, cache_impl, is_async = \
[kwargs.get(arg) for arg in ['max_concurrent', 'progress_bar', 'timeout', 'span', 'cache_impl', 'is_async']]
try:
with session:
cls.run(requests, results, max_concurrent, progress_bar,
timeout=timeout, span=span)
except Exception as e:
cls.enqueue(results, ((k, e) for k in pending.keys()))

while pending and not done:
done, chunk_results = cls.drain_queue(results)
for (risk_key_, priceable_), result in chunk_results:
future = pending.pop((risk_key_, priceable_), None)
if future is not None:
future.set_result(result)

if cache_impl is not None:
cache_impl.put(risk_key_, priceable_, result)

if not is_async:
# In async mode we can't tell if we've completed, we could be re-used
while pending:
(risk_key_, _), future = pending.popitem()
future.set_result(ErrorValue(risk_key_, 'No result returned'))

@classmethod
@abstractmethod
async def get_results(cls, responses: asyncio.Queue, results: asyncio.Queue,
Expand Down Expand Up @@ -129,7 +176,7 @@ def run(cls,
timeout: Optional[int] = None,
span: Optional[str] = None):
def _process_results(completed: list):
chunk_results = tuple(itertools.chain.from_iterable(cls._handle_results(request, result).items()
chunk_results = tuple(itertools.chain.from_iterable(cls.build_keyed_results(request, result).items()
for request, result in completed))
cls.enqueue(results, chunk_results, wait=True)

Expand Down Expand Up @@ -286,7 +333,8 @@ def num_risk_keys(request: RiskRequest):
asyncio.set_event_loop(None)

@classmethod
def _handle_results(cls, request: RiskRequest, results: Union[Iterable, Exception]) -> dict:
def build_keyed_results(cls, request: RiskRequest, results: Union[Iterable, Exception]) -> \
Dict[Tuple[RiskKey, Priceable], Any]:
formatted_results = {}

if isinstance(results, Exception):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
def format_df(data_dict):
df = pd.concat(data_dict, axis=1)
df.columns = data_dict.keys()
return df.fillna(method='ffill').dropna()
return df.ffill().dropna()


def volatility_screen(crosses, start_date, end_date, tenor='3m', plot=True):
Expand Down
Loading

0 comments on commit b6f16ad

Please sign in to comment.