diff --git a/lib/nostrum/api/ratelimiter.ex b/lib/nostrum/api/ratelimiter.ex index 62743b5bf..b193c8c34 100644 --- a/lib/nostrum/api/ratelimiter.ex +++ b/lib/nostrum/api/ratelimiter.ex @@ -40,15 +40,11 @@ defmodule Nostrum.Api.Ratelimiter do ## Multi-node - If a single global process is desired to handle all ratelimiting, the - ratelimiter can theoretically be adjusted to start registered via `:global`. - In practice, it may be more beneficial to have a local ratelimiter process on - each node and either using the local one for any API calls, or using a - consistent hash mechanism to distribute API requests around the cluster as - needed. **Do note that the API enforces a global user ratelimit across all - requests**. With a single process, the ratelimiter can track this without - hitting 429s at all, with multiple ratelimiters, the built-in requeue - functionality may or may not help. + nostrum will transparently distribute client requests across all ratelimiter + clusters running in the cluster. This allows us to account for per-route + ratelimits whilst still distributing work across cluster nodes. **Note that + the API enforces a global user ratelimit across all requests**, which we + cannot account for using this method. ## Inner workings @@ -202,6 +198,7 @@ defmodule Nostrum.Api.Ratelimiter do @behaviour :gen_statem alias Nostrum.Api.Base + alias Nostrum.Api.RatelimiterGroup alias Nostrum.Constants alias Nostrum.Error.ApiError @@ -311,6 +308,7 @@ defmodule Nostrum.Api.Ratelimiter do end def init(token) when is_binary(token) do + :ok = RatelimiterGroup.join(self()) # Uncomment the following to trace everything the ratelimiter is doing: # me = self() # spawn(fn -> :sys.trace(me, true) end) @@ -980,7 +978,9 @@ defmodule Nostrum.Api.Ratelimiter do will cause this to return an error. """ def queue(request) do - :gen_statem.call(@registered_name, {:queue, request}) + bucket = get_endpoint(request.route, request.method) + limiter = RatelimiterGroup.limiter_for_bucket(bucket) + :gen_statem.call(limiter, {:queue, request}) end @spec value_from_rltuple({String.t(), String.t()}) :: String.t() | nil @@ -1027,7 +1027,7 @@ defmodule Nostrum.Api.Ratelimiter do @doc """ Retrieves a proper ratelimit endpoint from a given route and url. """ - @spec get_endpoint(String.t(), String.t()) :: String.t() + @spec get_endpoint(String.t(), atom()) :: String.t() def get_endpoint(route, method) do endpoint = Regex.replace(~r/\/([a-z-]+)\/(?:[0-9]{17,19})/i, route, fn capture, param -> diff --git a/lib/nostrum/api/ratelimiter_group.ex b/lib/nostrum/api/ratelimiter_group.ex new file mode 100644 index 000000000..0e2dc53b7 --- /dev/null +++ b/lib/nostrum/api/ratelimiter_group.ex @@ -0,0 +1,68 @@ +defmodule Nostrum.Api.RatelimiterGroup do + @moduledoc """ + Tracks ratelimiters and determines correct ratelimiters to use per request. + + ## Purpose + + In a multi-node setup, users want to be able to make API requests from any + node in the cluster without having to worry about hitting ratelimits. This + module serves as the mediator between API clients on any nodes and their + target ratelimiter. + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + + ## Approach + + A naive implementation might simply forward requests to the locally (on the + same node) running ratelimiter. However, this falls short when modules on + other nodes want to make API requests, as they then effectively begin + tracking their own ratelimit state, rendering it inconsistent. + + Instead, the approach is that we have a locally running ratelimiter on each + node, all of which are registered via the `:pg` process group managed by this + module. When an API request comes in, we determine its ratelimit bucket (see + `Nostrum.Api.Ratelimiter.get_endpoint/2`) and based on that, determine the + target ratelimiter by selecting it from the list of known ratelimiters via + `:erlang.phash2/2`. + """ + + @scope_name __MODULE__ + @group_name :ratelimiters + + @doc """ + Return a ratelimiter PID to use for requests to the given ratelimiter `bucket`. + """ + @spec limiter_for_bucket(String.t()) :: pid() + def limiter_for_bucket(bucket) do + limiters = :pg.get_members(@scope_name, @group_name) + # "Processes are returned in no specific order." + sorted = Enum.sort(limiters) + total = length(sorted) + selected = :erlang.phash2(bucket, total) + Enum.at(sorted, selected) + end + + @doc "Join the given ratelimiter to the group." + @spec join(pid()) :: :ok + def join(pid) do + :pg.join(@scope_name, @group_name, pid) + end + + # Supervisor API + def start_link(_opts) do + :pg.start_link(@scope_name) + end + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end +end diff --git a/lib/nostrum/application.ex b/lib/nostrum/application.ex index 283651259..69dc330fe 100644 --- a/lib/nostrum/application.ex +++ b/lib/nostrum/application.ex @@ -29,6 +29,7 @@ defmodule Nostrum.Application do children = [ Nostrum.Store.Supervisor, Nostrum.ConsumerGroup, + Nostrum.Api.RatelimiterGroup, {Nostrum.Api.Ratelimiter, {Application.fetch_env!(:nostrum, :token), []}}, Nostrum.Shard.Connector, Nostrum.Cache.CacheSupervisor,