Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Opentelemetry] Add opentelemetry trace #4275

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions source/jormungandr/jormungandr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,23 @@
from flask_cors import CORS
from jormungandr import init


#### OpenTelemetry traces configuration ######################################
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import set_tracer_provider

tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
set_tracer_provider(tracer_provider)

app = Flask(__name__) # type: Flask

# Flask auto-instrumentation
from opentelemetry.instrumentation.flask import FlaskInstrumentor

FlaskInstrumentor().instrument_app(app)

init.load_configuration(app)
init.logger(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ def __init__(self, output_type_serializer):
'occupancy',
'arrival_stop_attractivity',
'departure_stop_attractivity',
'pseudo_duration',
]
),
help="choose the criteria used to compute pt journeys, feature in beta ",
Expand Down
6 changes: 6 additions & 0 deletions source/jormungandr/jormungandr/pt_planners/pt_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def __init__(
criteria=None,
olympic_site_params=None,
language="fr-FR",
departure_coord=None,
arrival_coord=None,
max_radius_to_free_access=None,
):

self.max_duration = max_duration
Expand All @@ -98,6 +101,9 @@ def __init__(
self.criteria = criteria
self.olympic_site_params = olympic_site_params or {}
self.language = language
self.departure_coord = departure_coord
self.arrival_coord = arrival_coord
self.max_radius_to_free_access = max_radius_to_free_access


# Needed for GraphicalIsochrones
Expand Down
48 changes: 33 additions & 15 deletions source/jormungandr/jormungandr/scenarios/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
StreetNetworkException,
)

from opentelemetry import trace, baggage


tracer = trace.get_tracer(__name__)


class PartialResponseContext(object):
requested_orig = None
Expand Down Expand Up @@ -138,14 +143,16 @@ def _compute_journeys(
}
requested_direct_path_modes.update(requested_dep_modes_with_pt)

parent_ctx = baggage.set_baggage("compute_journeys", "parent")

if context.partial_response_is_empty:
logger.debug('requesting places by uri orig: %s dest %s', request['origin'], request['destination'])

context.requested_orig_obj = pt_object_origin_detail
context.requested_dest_obj = pt_object_destination_detail

context.streetnetwork_path_pool = StreetNetworkPathPool(
future_manager=future_manager, instance=instance
future_manager=future_manager, instance=instance, ctx=parent_ctx
)

period_extremity = PeriodExtremity(request['datetime'], request['clockwise'])
Expand Down Expand Up @@ -203,6 +210,7 @@ def _compute_journeys(
request_id="{}_crowfly_orig".format(request_id),
o_d_crowfly_distance=crowfly_distance,
direct_path_timeout=direct_path_timeout,
ctx=parent_ctx,
)

context.dest_proximities_by_crowfly = ProximitiesByCrowflyPool(
Expand All @@ -216,6 +224,7 @@ def _compute_journeys(
request_id="{}_crowfly_dest".format(request_id),
o_d_crowfly_distance=crowfly_distance,
direct_path_timeout=direct_path_timeout,
ctx=parent_ctx,
)

context.orig_places_free_access = PlacesFreeAccess(
Expand Down Expand Up @@ -245,6 +254,7 @@ def _compute_journeys(
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
request_id="{}_fallback_orig".format(request_id),
direct_path_timeout=direct_path_timeout,
ctx=parent_ctx,
)

context.dest_fallback_durations_pool = FallbackDurationsPool(
Expand All @@ -259,6 +269,7 @@ def _compute_journeys(
direct_path_type=StreetNetworkPathType.ENDING_FALLBACK,
request_id="{}_fallback_dest".format(request_id),
direct_path_timeout=direct_path_timeout,
ctx=parent_ctx,
)

pt_journey_pool = PtJourneyPool(
Expand All @@ -273,6 +284,7 @@ def _compute_journeys(
request=request,
request_type=request_type,
request_id="{}_ptjourney".format(request_id),
ctx=parent_ctx,
)

pt_journey_elements = wait_and_build_crowflies(
Expand Down Expand Up @@ -319,7 +331,11 @@ def finalise_journeys(self, future_manager, request, responses, context, instanc
Fallbacks will only be computed for journeys not tagged as 'to_delete'
"""

streetnetwork_path_pool = StreetNetworkPathPool(future_manager=future_manager, instance=instance)
parent_ctx = baggage.set_baggage("finalise_journeys", "parent")

streetnetwork_path_pool = StreetNetworkPathPool(
future_manager=future_manager, instance=instance, ctx=parent_ctx
)

journeys_to_complete = get_journeys_to_complete(responses, context, is_debug)

Expand Down Expand Up @@ -514,16 +530,17 @@ def call_kraken(
type_pb2.ISOCHRONE,
)
elif request_type == type_pb2.PLANNER:
return self._scenario._compute_journeys(
future_manager,
pt_object_origin_detail,
pt_object_destination_detail,
request,
instance,
krakens_call,
context,
type_pb2.PLANNER,
)
with tracer.start_as_current_span(name="distributed_compute_journeys"):
return self._scenario._compute_journeys(
future_manager,
pt_object_origin_detail,
pt_object_destination_detail,
request,
instance,
krakens_call,
context,
type_pb2.PLANNER,
)
else:
abort(400, message="This type of request is not supported with distributed")
except PtException as e:
Expand All @@ -542,9 +559,10 @@ def finalise_journeys(self, request, responses, context, instance, is_debug, req
with FutureManager(self.greenlet_pool_size) as future_manager, timed_logger(
logger, 'finalise_journeys', "{}_finalise_journeys".format(request_id)
):
self._scenario.finalise_journeys(
future_manager, request, responses, context, instance, is_debug, request_id
)
with tracer.start_as_current_span(name="finalise_journeys"):
self._scenario.finalise_journeys(
future_manager, request, responses, context, instance, is_debug, request_id
)

from jormungandr.scenarios import journey_filter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@

AccessMapElement = namedtuple('AccessMapElement', ['stop_point_uri', 'access_point'])

from opentelemetry import trace, baggage


tracer = trace.get_tracer(__name__)


class FallbackDurations:
"""
Expand Down Expand Up @@ -95,6 +100,7 @@ def __init__(
speed_switcher,
request_id,
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
ctx=None,
):
"""
:param future_manager: a module that manages the future pool properly
Expand Down Expand Up @@ -122,6 +128,7 @@ def __init__(
self._streetnetwork_service = instance.get_street_network(mode, request)
self._logger = logging.getLogger(__name__)
self._request_id = request_id
self._ctx = ctx
self._async_request()

def _get_duration(self, resp, place):
Expand Down Expand Up @@ -358,29 +365,30 @@ def _determine_centers_isochrone(self):
return result

def _do_request(self, futures, centers_isochrone, stop_points):
if len(futures) == 1:
return futures[0].wait_and_get()
else:
fallback_duration = dict()
for place_isochrone in stop_points:
best_duration = float("inf")
best_element = None
for index, future in enumerate(futures):
fallback = future.wait_and_get()
element = fallback.get(place_isochrone.uri)
if element and element.duration < best_duration:
best_duration = element.duration
best_element = DurationElement(
element.duration,
element.status,
element.car_park,
element.car_park_crowfly_duration,
element.via_pt_access,
centers_isochrone[index],
)
if best_element:
fallback_duration[place_isochrone.uri] = best_element
return fallback_duration
with tracer.start_as_current_span(name="fallback_durations", context=self._ctx):
if len(futures) == 1:
return futures[0].wait_and_get()
else:
fallback_duration = dict()
for place_isochrone in stop_points:
best_duration = float("inf")
best_element = None
for index, future in enumerate(futures):
fallback = future.wait_and_get()
element = fallback.get(place_isochrone.uri)
if element and element.duration < best_duration:
best_duration = element.duration
best_element = DurationElement(
element.duration,
element.status,
element.car_park,
element.car_park_crowfly_duration,
element.via_pt_access,
centers_isochrone[index],
)
if best_element:
fallback_duration[place_isochrone.uri] = best_element
return fallback_duration

def _async_request(self):
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -505,6 +513,7 @@ def __init__(
request_id,
direct_path_timeout,
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
ctx=None,
):
super(FallbackDurationsPool, self).__init__()
self._future_manager = future_manager
Expand All @@ -522,6 +531,7 @@ def __init__(
self._request_id = request_id

self._overrided_uri_map = defaultdict(dict)
self._ctx = ctx
self._async_request(direct_path_timeout)

@property
Expand Down Expand Up @@ -560,6 +570,7 @@ def _async_request(self, direct_path_timeout):
self._speed_switcher,
"{}_{}".format(self._request_id, mode),
self._direct_path_type,
self._ctx,
)
self._value[mode] = fallback_durations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import logging
from navitiacommon import type_pb2

from opentelemetry import trace, baggage


tracer = trace.get_tracer(__name__)


class ProximitiesByCrowfly:
"""
Expand All @@ -54,6 +59,7 @@ def __init__(
request,
request_id,
depth,
ctx,
):
self._future_manager = future_manager
self._instance = instance
Expand All @@ -72,6 +78,7 @@ def __init__(
self._forbidden_uris = utils.get_poi_params(request['forbidden_uris[]'])
self._allowed_id = utils.get_poi_params(request['allowed_id[]'])
self._pt_planner = self._instance.get_pt_planner(request['_pt_planner'])
self._ctx = ctx
self._async_request()

@new_relic.distributedEvent("get_crowfly", "street_network")
Expand All @@ -98,32 +105,35 @@ def _get_crow_fly(self):
)

def _do_request(self):
logger = logging.getLogger(__name__)

# When max_duration_to_pt is 0, there is no need to compute the fallback to pt, except if place is a stop_point
# or a stop_area
if self._max_duration == 0:
logger.debug("max duration equals to 0, no need to compute proximities by crowfly")

# When max_duration_to_pt is 0, we can get on the public transport ONLY if the place is a stop_point
if self._instance.georef.get_stop_points_from_uri(self._requested_place_obj.uri, self._request_id):
return [self._requested_place_obj]

coord = utils.get_pt_object_coord(self._requested_place_obj)
if coord.lat and coord.lon:
crow_fly = self._get_crow_fly(self._instance.georef)

if self._mode == fm.FallbackModes.car.name:
# pick up only sytral_parkings with park_ride = yes
crow_fly = jormungandr.street_network.utils.pick_up_park_ride_car_park(crow_fly)

logger.debug(
"finish proximities by crowfly from %s in %s", self._requested_place_obj.uri, self._mode
)
return crow_fly
with tracer.start_as_current_span(name="proximities_by_crowfly", context=self._ctx):
logger = logging.getLogger(__name__)

# When max_duration_to_pt is 0, there is no need to compute the fallback to pt, except if place is a stop_point
# or a stop_area
if self._max_duration == 0:
logger.debug("max duration equals to 0, no need to compute proximities by crowfly")

# When max_duration_to_pt is 0, we can get on the public transport ONLY if the place is a stop_point
if self._instance.georef.get_stop_points_from_uri(
self._requested_place_obj.uri, self._request_id
):
return [self._requested_place_obj]

coord = utils.get_pt_object_coord(self._requested_place_obj)
if coord.lat and coord.lon:
crow_fly = self._get_crow_fly(self._instance.georef)

if self._mode == fm.FallbackModes.car.name:
# pick up only sytral_parkings with park_ride = yes
crow_fly = jormungandr.street_network.utils.pick_up_park_ride_car_park(crow_fly)

logger.debug(
"finish proximities by crowfly from %s in %s", self._requested_place_obj.uri, self._mode
)
return crow_fly

logger.debug("the coord of requested places is not valid: %s", coord)
return []
logger.debug("the coord of requested places is not valid: %s", coord)
return []

def _async_request(self):
self._value = self._future_manager.create_future(self._do_request)
Expand All @@ -145,6 +155,7 @@ def __init__(
request_id,
o_d_crowfly_distance,
direct_path_timeout,
ctx,
):
"""
A ProximitiesByCrowflyPool is a set of ProximitiesByCrowfly grouped by mode
Expand Down Expand Up @@ -176,6 +187,7 @@ def __init__(
self._value = {}
self._request_id = request_id
self._o_d_crowfly_distance = o_d_crowfly_distance
self._ctx = ctx
self._async_request(direct_path_timeout)

def _async_request(self, direct_path_timeout):
Expand Down Expand Up @@ -222,6 +234,7 @@ def _async_request(self, direct_path_timeout):
request=self._request,
request_id=self._request_id,
depth=depth,
ctx=self._ctx,
)

self._value[mode] = p
Expand Down
Loading
Loading