From 915d1d47c567b084a44be9d8370262322a286c60 Mon Sep 17 00:00:00 2001 From: Patrick Qian Date: Thu, 16 May 2024 17:32:30 +0200 Subject: [PATCH 1/2] add new criteria --- source/jormungandr/jormungandr/interfaces/v1/journey_common.py | 1 + source/jormungandr/jormungandr/utils.py | 2 ++ source/navitia-proto | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/jormungandr/jormungandr/interfaces/v1/journey_common.py b/source/jormungandr/jormungandr/interfaces/v1/journey_common.py index d86b17f350..e0f0307645 100644 --- a/source/jormungandr/jormungandr/interfaces/v1/journey_common.py +++ b/source/jormungandr/jormungandr/interfaces/v1/journey_common.py @@ -521,6 +521,7 @@ def __init__(self, output_type_serializer): 'occupancy', 'arrival_stop_attractivity', 'departure_stop_attractivity', + 'pseudo_duration', ] ), help="choose the criteria used to compute pt journeys, feature in beta ", diff --git a/source/jormungandr/jormungandr/utils.py b/source/jormungandr/jormungandr/utils.py index 85911485f6..af4be911e1 100644 --- a/source/jormungandr/jormungandr/utils.py +++ b/source/jormungandr/jormungandr/utils.py @@ -1053,6 +1053,8 @@ def _set_arrival_attractivity(stop_point_id, location): req.journeys.criteria = request_pb2.ArrivalStopAttractivity elif journey_parameters.criteria == "departure_stop_attractivity": req.journeys.criteria = request_pb2.DepartureStopAttractivity + elif journey_parameters.criteria == "pseudo_duration": + req.journeys.criteria = request_pb2.PseudoDuration return req diff --git a/source/navitia-proto b/source/navitia-proto index b9e749ddd0..6cd9d7e073 160000 --- a/source/navitia-proto +++ b/source/navitia-proto @@ -1 +1 @@ -Subproject commit b9e749ddd01803d5429013d6fd7ed4d5cfc8250d +Subproject commit 6cd9d7e073af745a7638bb35372160206301b7ee From 9487a98ac9a534bd7ced026e3fee2aa6d5f03be1 Mon Sep 17 00:00:00 2001 From: Patrick Qian Date: Mon, 3 Jun 2024 09:48:21 +0200 Subject: [PATCH 2/2] add opentelemetry trace --- source/jormungandr/jormungandr/__init__.py | 15 ++++ .../jormungandr/pt_planners/pt_planner.py | 6 ++ .../jormungandr/scenarios/distributed.py | 48 ++++++++---- .../helper_classes/fallback_durations.py | 57 +++++++++------ .../helper_classes/proximities_by_crowfly.py | 63 +++++++++------- .../scenarios/helper_classes/pt_journey.py | 66 +++++++++++------ .../helper_classes/streetnetwork_path.py | 73 ++++++++++++++----- .../jormungandr/scenarios/new_default.py | 31 +++++--- 8 files changed, 242 insertions(+), 117 deletions(-) diff --git a/source/jormungandr/jormungandr/__init__.py b/source/jormungandr/jormungandr/__init__.py index 9d8c8ece20..50f9a101a0 100644 --- a/source/jormungandr/jormungandr/__init__.py +++ b/source/jormungandr/jormungandr/__init__.py @@ -38,8 +38,23 @@ from flask_cors import CORS from jormungandr import init + +#### OpenTelemetry traces configuration ###################################### +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.trace import set_tracer_provider + +tracer_provider = TracerProvider() +tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) +set_tracer_provider(tracer_provider) + app = Flask(__name__) # type: Flask +# Flask auto-instrumentation +from opentelemetry.instrumentation.flask import FlaskInstrumentor + +FlaskInstrumentor().instrument_app(app) + init.load_configuration(app) init.logger(app) diff --git a/source/jormungandr/jormungandr/pt_planners/pt_planner.py b/source/jormungandr/jormungandr/pt_planners/pt_planner.py index 494f609219..51511af622 100644 --- a/source/jormungandr/jormungandr/pt_planners/pt_planner.py +++ b/source/jormungandr/jormungandr/pt_planners/pt_planner.py @@ -75,6 +75,9 @@ def __init__( criteria=None, olympic_site_params=None, language="fr-FR", + departure_coord=None, + arrival_coord=None, + max_radius_to_free_access=None, ): self.max_duration = max_duration @@ -98,6 +101,9 @@ def __init__( self.criteria = criteria self.olympic_site_params = olympic_site_params or {} self.language = language + self.departure_coord = departure_coord + self.arrival_coord = arrival_coord + self.max_radius_to_free_access = max_radius_to_free_access # Needed for GraphicalIsochrones diff --git a/source/jormungandr/jormungandr/scenarios/distributed.py b/source/jormungandr/jormungandr/scenarios/distributed.py index addabb3469..c2c5e46eec 100644 --- a/source/jormungandr/jormungandr/scenarios/distributed.py +++ b/source/jormungandr/jormungandr/scenarios/distributed.py @@ -62,6 +62,11 @@ StreetNetworkException, ) +from opentelemetry import trace, baggage + + +tracer = trace.get_tracer(__name__) + class PartialResponseContext(object): requested_orig = None @@ -138,6 +143,8 @@ def _compute_journeys( } requested_direct_path_modes.update(requested_dep_modes_with_pt) + parent_ctx = baggage.set_baggage("compute_journeys", "parent") + if context.partial_response_is_empty: logger.debug('requesting places by uri orig: %s dest %s', request['origin'], request['destination']) @@ -145,7 +152,7 @@ def _compute_journeys( context.requested_dest_obj = pt_object_destination_detail context.streetnetwork_path_pool = StreetNetworkPathPool( - future_manager=future_manager, instance=instance + future_manager=future_manager, instance=instance, ctx=parent_ctx ) period_extremity = PeriodExtremity(request['datetime'], request['clockwise']) @@ -203,6 +210,7 @@ def _compute_journeys( request_id="{}_crowfly_orig".format(request_id), o_d_crowfly_distance=crowfly_distance, direct_path_timeout=direct_path_timeout, + ctx=parent_ctx, ) context.dest_proximities_by_crowfly = ProximitiesByCrowflyPool( @@ -216,6 +224,7 @@ def _compute_journeys( request_id="{}_crowfly_dest".format(request_id), o_d_crowfly_distance=crowfly_distance, direct_path_timeout=direct_path_timeout, + ctx=parent_ctx, ) context.orig_places_free_access = PlacesFreeAccess( @@ -245,6 +254,7 @@ def _compute_journeys( direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK, request_id="{}_fallback_orig".format(request_id), direct_path_timeout=direct_path_timeout, + ctx=parent_ctx, ) context.dest_fallback_durations_pool = FallbackDurationsPool( @@ -259,6 +269,7 @@ def _compute_journeys( direct_path_type=StreetNetworkPathType.ENDING_FALLBACK, request_id="{}_fallback_dest".format(request_id), direct_path_timeout=direct_path_timeout, + ctx=parent_ctx, ) pt_journey_pool = PtJourneyPool( @@ -273,6 +284,7 @@ def _compute_journeys( request=request, request_type=request_type, request_id="{}_ptjourney".format(request_id), + ctx=parent_ctx, ) pt_journey_elements = wait_and_build_crowflies( @@ -319,7 +331,11 @@ def finalise_journeys(self, future_manager, request, responses, context, instanc Fallbacks will only be computed for journeys not tagged as 'to_delete' """ - streetnetwork_path_pool = StreetNetworkPathPool(future_manager=future_manager, instance=instance) + parent_ctx = baggage.set_baggage("finalise_journeys", "parent") + + streetnetwork_path_pool = StreetNetworkPathPool( + future_manager=future_manager, instance=instance, ctx=parent_ctx + ) journeys_to_complete = get_journeys_to_complete(responses, context, is_debug) @@ -514,16 +530,17 @@ def call_kraken( type_pb2.ISOCHRONE, ) elif request_type == type_pb2.PLANNER: - return self._scenario._compute_journeys( - future_manager, - pt_object_origin_detail, - pt_object_destination_detail, - request, - instance, - krakens_call, - context, - type_pb2.PLANNER, - ) + with tracer.start_as_current_span(name="distributed_compute_journeys"): + return self._scenario._compute_journeys( + future_manager, + pt_object_origin_detail, + pt_object_destination_detail, + request, + instance, + krakens_call, + context, + type_pb2.PLANNER, + ) else: abort(400, message="This type of request is not supported with distributed") except PtException as e: @@ -542,9 +559,10 @@ def finalise_journeys(self, request, responses, context, instance, is_debug, req with FutureManager(self.greenlet_pool_size) as future_manager, timed_logger( logger, 'finalise_journeys', "{}_finalise_journeys".format(request_id) ): - self._scenario.finalise_journeys( - future_manager, request, responses, context, instance, is_debug, request_id - ) + with tracer.start_as_current_span(name="finalise_journeys"): + self._scenario.finalise_journeys( + future_manager, request, responses, context, instance, is_debug, request_id + ) from jormungandr.scenarios import journey_filter diff --git a/source/jormungandr/jormungandr/scenarios/helper_classes/fallback_durations.py b/source/jormungandr/jormungandr/scenarios/helper_classes/fallback_durations.py index 158089d60e..e0f2ae1613 100644 --- a/source/jormungandr/jormungandr/scenarios/helper_classes/fallback_durations.py +++ b/source/jormungandr/jormungandr/scenarios/helper_classes/fallback_durations.py @@ -66,6 +66,11 @@ AccessMapElement = namedtuple('AccessMapElement', ['stop_point_uri', 'access_point']) +from opentelemetry import trace, baggage + + +tracer = trace.get_tracer(__name__) + class FallbackDurations: """ @@ -95,6 +100,7 @@ def __init__( speed_switcher, request_id, direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK, + ctx=None, ): """ :param future_manager: a module that manages the future pool properly @@ -122,6 +128,7 @@ def __init__( self._streetnetwork_service = instance.get_street_network(mode, request) self._logger = logging.getLogger(__name__) self._request_id = request_id + self._ctx = ctx self._async_request() def _get_duration(self, resp, place): @@ -358,29 +365,30 @@ def _determine_centers_isochrone(self): return result def _do_request(self, futures, centers_isochrone, stop_points): - if len(futures) == 1: - return futures[0].wait_and_get() - else: - fallback_duration = dict() - for place_isochrone in stop_points: - best_duration = float("inf") - best_element = None - for index, future in enumerate(futures): - fallback = future.wait_and_get() - element = fallback.get(place_isochrone.uri) - if element and element.duration < best_duration: - best_duration = element.duration - best_element = DurationElement( - element.duration, - element.status, - element.car_park, - element.car_park_crowfly_duration, - element.via_pt_access, - centers_isochrone[index], - ) - if best_element: - fallback_duration[place_isochrone.uri] = best_element - return fallback_duration + with tracer.start_as_current_span(name="fallback_durations", context=self._ctx): + if len(futures) == 1: + return futures[0].wait_and_get() + else: + fallback_duration = dict() + for place_isochrone in stop_points: + best_duration = float("inf") + best_element = None + for index, future in enumerate(futures): + fallback = future.wait_and_get() + element = fallback.get(place_isochrone.uri) + if element and element.duration < best_duration: + best_duration = element.duration + best_element = DurationElement( + element.duration, + element.status, + element.car_park, + element.car_park_crowfly_duration, + element.via_pt_access, + centers_isochrone[index], + ) + if best_element: + fallback_duration[place_isochrone.uri] = best_element + return fallback_duration def _async_request(self): logger = logging.getLogger(__name__) @@ -505,6 +513,7 @@ def __init__( request_id, direct_path_timeout, direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK, + ctx=None, ): super(FallbackDurationsPool, self).__init__() self._future_manager = future_manager @@ -522,6 +531,7 @@ def __init__( self._request_id = request_id self._overrided_uri_map = defaultdict(dict) + self._ctx = ctx self._async_request(direct_path_timeout) @property @@ -560,6 +570,7 @@ def _async_request(self, direct_path_timeout): self._speed_switcher, "{}_{}".format(self._request_id, mode), self._direct_path_type, + self._ctx, ) self._value[mode] = fallback_durations diff --git a/source/jormungandr/jormungandr/scenarios/helper_classes/proximities_by_crowfly.py b/source/jormungandr/jormungandr/scenarios/helper_classes/proximities_by_crowfly.py index 675d27f103..afa54b4d05 100644 --- a/source/jormungandr/jormungandr/scenarios/helper_classes/proximities_by_crowfly.py +++ b/source/jormungandr/jormungandr/scenarios/helper_classes/proximities_by_crowfly.py @@ -34,6 +34,11 @@ import logging from navitiacommon import type_pb2 +from opentelemetry import trace, baggage + + +tracer = trace.get_tracer(__name__) + class ProximitiesByCrowfly: """ @@ -54,6 +59,7 @@ def __init__( request, request_id, depth, + ctx, ): self._future_manager = future_manager self._instance = instance @@ -72,6 +78,7 @@ def __init__( self._forbidden_uris = utils.get_poi_params(request['forbidden_uris[]']) self._allowed_id = utils.get_poi_params(request['allowed_id[]']) self._pt_planner = self._instance.get_pt_planner(request['_pt_planner']) + self._ctx = ctx self._async_request() @new_relic.distributedEvent("get_crowfly", "street_network") @@ -98,32 +105,35 @@ def _get_crow_fly(self): ) def _do_request(self): - logger = logging.getLogger(__name__) - - # When max_duration_to_pt is 0, there is no need to compute the fallback to pt, except if place is a stop_point - # or a stop_area - if self._max_duration == 0: - logger.debug("max duration equals to 0, no need to compute proximities by crowfly") - - # When max_duration_to_pt is 0, we can get on the public transport ONLY if the place is a stop_point - if self._instance.georef.get_stop_points_from_uri(self._requested_place_obj.uri, self._request_id): - return [self._requested_place_obj] - - coord = utils.get_pt_object_coord(self._requested_place_obj) - if coord.lat and coord.lon: - crow_fly = self._get_crow_fly(self._instance.georef) - - if self._mode == fm.FallbackModes.car.name: - # pick up only sytral_parkings with park_ride = yes - crow_fly = jormungandr.street_network.utils.pick_up_park_ride_car_park(crow_fly) - - logger.debug( - "finish proximities by crowfly from %s in %s", self._requested_place_obj.uri, self._mode - ) - return crow_fly + with tracer.start_as_current_span(name="proximities_by_crowfly", context=self._ctx): + logger = logging.getLogger(__name__) + + # When max_duration_to_pt is 0, there is no need to compute the fallback to pt, except if place is a stop_point + # or a stop_area + if self._max_duration == 0: + logger.debug("max duration equals to 0, no need to compute proximities by crowfly") + + # When max_duration_to_pt is 0, we can get on the public transport ONLY if the place is a stop_point + if self._instance.georef.get_stop_points_from_uri( + self._requested_place_obj.uri, self._request_id + ): + return [self._requested_place_obj] + + coord = utils.get_pt_object_coord(self._requested_place_obj) + if coord.lat and coord.lon: + crow_fly = self._get_crow_fly(self._instance.georef) + + if self._mode == fm.FallbackModes.car.name: + # pick up only sytral_parkings with park_ride = yes + crow_fly = jormungandr.street_network.utils.pick_up_park_ride_car_park(crow_fly) + + logger.debug( + "finish proximities by crowfly from %s in %s", self._requested_place_obj.uri, self._mode + ) + return crow_fly - logger.debug("the coord of requested places is not valid: %s", coord) - return [] + logger.debug("the coord of requested places is not valid: %s", coord) + return [] def _async_request(self): self._value = self._future_manager.create_future(self._do_request) @@ -145,6 +155,7 @@ def __init__( request_id, o_d_crowfly_distance, direct_path_timeout, + ctx, ): """ A ProximitiesByCrowflyPool is a set of ProximitiesByCrowfly grouped by mode @@ -176,6 +187,7 @@ def __init__( self._value = {} self._request_id = request_id self._o_d_crowfly_distance = o_d_crowfly_distance + self._ctx = ctx self._async_request(direct_path_timeout) def _async_request(self, direct_path_timeout): @@ -222,6 +234,7 @@ def _async_request(self, direct_path_timeout): request=self._request, request_id=self._request_id, depth=depth, + ctx=self._ctx, ) self._value[mode] = p diff --git a/source/jormungandr/jormungandr/scenarios/helper_classes/pt_journey.py b/source/jormungandr/jormungandr/scenarios/helper_classes/pt_journey.py index 42ef572686..d921d77541 100644 --- a/source/jormungandr/jormungandr/scenarios/helper_classes/pt_journey.py +++ b/source/jormungandr/jormungandr/scenarios/helper_classes/pt_journey.py @@ -29,7 +29,7 @@ from __future__ import absolute_import from jormungandr import utils, new_relic -from jormungandr.utils import date_to_timestamp +from jormungandr.utils import date_to_timestamp, get_pt_object_coord from jormungandr.street_network.street_network import StreetNetworkPathType from navitiacommon import response_pb2, type_pb2 from collections import namedtuple @@ -41,6 +41,11 @@ PtPoolElement = namedtuple('PtPoolElement', ['dep_mode', 'arr_mode', 'pt_journey']) +from opentelemetry import trace, baggage + + +tracer = trace.get_tracer(__name__) + class PtJourney: """ @@ -52,6 +57,8 @@ def __init__( self, future_manager, instance, + requested_orig_obj, + requested_dest_obj, orig_fallback_durtaions_pool, dest_fallback_durations_pool, dep_mode, @@ -63,9 +70,12 @@ def __init__( isochrone_center, request_type, request_id, + ctx, ): self._future_manager = future_manager self._instance = instance + self._requested_orig_obj = requested_orig_obj + self._requested_dest_obj = requested_dest_obj self._orig_fallback_durtaions_pool = orig_fallback_durtaions_pool self._dest_fallback_durations_pool = dest_fallback_durations_pool self._dep_mode = dep_mode @@ -80,6 +90,7 @@ def __init__( self._logger = logging.getLogger(__name__) self._request_id = request_id self._pt_planner = self._instance.get_pt_planner(request['_pt_planner']) + self._ctx = ctx self._async_request() @new_relic.distributedEvent("journeys", "journeys") @@ -106,34 +117,38 @@ def _do_journeys_request(self): orig_fallback_durations = self._orig_fallback_durtaions_pool.get_best_fallback_durations(self._dep_mode) dest_fallback_durations = self._dest_fallback_durations_pool.get_best_fallback_durations(self._arr_mode) - if ( - not orig_fallback_durations - or not dest_fallback_durations - or not self._request.get('max_duration', 0) - ): - return None + with tracer.start_as_current_span(name="pt_journey", context=self._ctx): - resp = self._journeys(self._pt_planner, orig_fallback_durations, dest_fallback_durations) + if ( + not orig_fallback_durations + or not dest_fallback_durations + or not self._request.get('max_duration', 0) + ): + return None - for j in resp.journeys: - j.internal_id = str(utils.generate_id()) + resp = self._journeys(self._pt_planner, orig_fallback_durations, dest_fallback_durations) + + for j in resp.journeys: + j.internal_id = str(utils.generate_id()) + + if resp.HasField(str("error")): + self._logger.debug( + "pt journey has error dep_mode: %s and arr_mode: %s", self._dep_mode, self._arr_mode + ) + # Here needs to modify error message of no_solution + if not orig_fallback_durations: + resp.error.id = response_pb2.Error.no_origin + resp.error.message = "Public transport is not reachable from origin" + elif not dest_fallback_durations: + resp.error.id = response_pb2.Error.no_destination + resp.error.message = "Public transport is not reachable from destination" - if resp.HasField(str("error")): self._logger.debug( - "pt journey has error dep_mode: %s and arr_mode: %s", self._dep_mode, self._arr_mode + "finish public transport journey with dep_mode: %s and arr_mode: %s", + self._dep_mode, + self._arr_mode, ) - # Here needs to modify error message of no_solution - if not orig_fallback_durations: - resp.error.id = response_pb2.Error.no_origin - resp.error.message = "Public transport is not reachable from origin" - elif not dest_fallback_durations: - resp.error.id = response_pb2.Error.no_destination - resp.error.message = "Public transport is not reachable from destination" - - self._logger.debug( - "finish public transport journey with dep_mode: %s and arr_mode: %s", self._dep_mode, self._arr_mode - ) - return resp + return resp @new_relic.distributedEvent("graphical_isochrone", "graphical_isochrone") def _graphical_isochrone(self, orig_fallback_durations, dest_fallback_durations): @@ -256,6 +271,7 @@ def __init__( request_type, request_id, isochrone_center=None, + ctx=None, ): self._future_manager = future_manager self._instance = instance @@ -273,6 +289,7 @@ def __init__( self._request = request self._value = [] self._request_id = request_id + self._ctx = ctx self._async_request() @staticmethod @@ -369,6 +386,7 @@ def _async_request(self): isochrone_center=self._isochrone_center, request_type=self._request_type, request_id="{}_{}_{}".format(self._request_id, dep_mode, arr_mode), + ctx=self._ctx, ) self._value.append(PtPoolElement(dep_mode, arr_mode, pt_journey)) diff --git a/source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py b/source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py index 16e87444bb..38a5d59b3f 100644 --- a/source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py +++ b/source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py @@ -48,6 +48,10 @@ Dp_element = namedtuple("Dp_element", "origin, destination, response") +from opentelemetry import trace, baggage + + +tracer = trace.get_tracer(__name__) class StreetNetworkPath: @@ -67,6 +71,7 @@ def __init__( request, streetnetwork_path_type, request_id, + ctx=None, ): """ :param future_manager: a module that manages the future pool properly @@ -93,6 +98,7 @@ def __init__( self._logger = logging.getLogger(__name__) self._request_id = request_id self._best_dp = None + self._ctx = ctx self._async_request() @staticmethod @@ -196,27 +202,52 @@ def get_pt_object_destination(self, dp): return None def _do_request(self, origin, destination): - self._logger.debug( - "requesting %s direct path from %s to %s by %s", - self._path_type, - self._orig_obj.uri, - self._dest_obj.uri, - self._mode, - ) - - dp = self._direct_path_with_fp(self._streetnetwork_service, origin, destination) + if self._ctx: + self._logger.info("{}", self._ctx) + with tracer.start_as_current_span(name="direct_path", context=self._ctx): + self._logger.debug( + "requesting %s direct path from %s to %s by %s", + self._path_type, + self._orig_obj.uri, + self._dest_obj.uri, + self._mode, + ) + + dp = self._direct_path_with_fp(self._streetnetwork_service, origin, destination) + + if getattr(dp, "journeys", None): + dp.journeys[0].internal_id = str(utils.generate_id()) + + self._logger.debug( + "finish %s direct path from %s to %s by %s", + self._path_type, + self._orig_obj.uri, + self._dest_obj.uri, + self._mode, + ) + return Dp_element(origin, destination, dp) + else: + self._logger.debug( + "requesting %s direct path from %s to %s by %s", + self._path_type, + self._orig_obj.uri, + self._dest_obj.uri, + self._mode, + ) - if getattr(dp, "journeys", None): - dp.journeys[0].internal_id = str(utils.generate_id()) + dp = self._direct_path_with_fp(self._streetnetwork_service, origin, destination) - self._logger.debug( - "finish %s direct path from %s to %s by %s", - self._path_type, - self._orig_obj.uri, - self._dest_obj.uri, - self._mode, - ) - return Dp_element(origin, destination, dp) + if getattr(dp, "journeys", None): + dp.journeys[0].internal_id = str(utils.generate_id()) + + self._logger.debug( + "finish %s direct path from %s to %s by %s", + self._path_type, + self._orig_obj.uri, + self._dest_obj.uri, + self._mode, + ) + return Dp_element(origin, destination, dp) def _async_request(self): self._futures = [] @@ -263,11 +294,12 @@ class StreetNetworkPathPool: According to its usage, a StreetNetworkPath can be direct, beginning_fallback and ending_fallback """ - def __init__(self, future_manager, instance): + def __init__(self, future_manager, instance, ctx=None): self._future_manager = future_manager self._instance = instance self._value = {} self._direct_paths_future_by_mode = {} + self.ctx = ctx def add_async_request( self, @@ -300,6 +332,7 @@ def add_async_request( request, streetnetwork_path_type, request_id, + ctx=self.ctx, ) if streetnetwork_path_type is StreetNetworkPathType.DIRECT: self._direct_paths_future_by_mode[mode] = path diff --git a/source/jormungandr/jormungandr/scenarios/new_default.py b/source/jormungandr/jormungandr/scenarios/new_default.py index c052607f17..3e2c957b0a 100644 --- a/source/jormungandr/jormungandr/scenarios/new_default.py +++ b/source/jormungandr/jormungandr/scenarios/new_default.py @@ -111,6 +111,16 @@ CO2_ESTIMATION_COEFF = 1.35 +from opentelemetry import trace, baggage +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor +from opentelemetry.baggage.propagation import W3CBaggagePropagator + + +tracer = trace.get_tracer(__name__) + + def get_kraken_calls(request): """ return a list of tuple (departure fallback mode, arrival fallback mode, direct_path_type) @@ -1334,16 +1344,17 @@ def fill_journeys(self, request_type, api_request, instance): min_nb_journeys_left = min_nb_journeys - nb_qualified_journeys request['min_nb_journeys'] = max(0, min_nb_journeys_left) - new_resp = self.call_kraken( - pt_object_origin, - pt_object_destination, - request_type, - request, - instance, - krakens_call, - "{}_try_{}".format(request_id, nb_try), - distributed_context, - ) + with tracer.start_as_current_span("call_kraken"): + new_resp = self.call_kraken( + pt_object_origin, + pt_object_destination, + request_type, + request, + instance, + krakens_call, + "{}_try_{}".format(request_id, nb_try), + distributed_context, + ) _tag_by_mode(new_resp) _tag_direct_path(new_resp)