-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set sender/receiver capabilities to attach frame in the amqp10_client #11337
base: main
Are you sure you want to change the base?
Conversation
26fedc1
to
a24e3f6
Compare
9b524cd
to
52cd6be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest that we first test end-to-end that the RabbitMQ <-> IBM MQ integration works before merging this PR.
Right now, this PR adds a lot of boilerplate code that is not used and contains bugs.
attach_sender_link/3, | ||
attach_sender_link/4, | ||
attach_sender_link/5, | ||
attach_sender_link/6, | ||
attach_sender_link_sync/3, | ||
attach_sender_link_sync/4, | ||
attach_sender_link_sync/5, | ||
attach_sender_link_sync/6, | ||
attach_receiver_link/3, | ||
attach_receiver_link/4, | ||
attach_receiver_link/5, | ||
attach_receiver_link/6, | ||
attach_receiver_link/7, | ||
attach_receiver_link/8, | ||
attach_link/2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change the API of this client lib contains 15 different functions to attach a link.
This is poor API design. We should keep the API minimal. Instead of adding 3 new attach functions to the API, I suggest that applications can start to set capabilities by using attach_link/2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will look into using attach_link
. When I was doing it I thought there could be another way of doing this like setting a structure like Attach_args and passing it . But the API already presented this pattern or design . I just followed it.
translate_terminus_capabilities(none) -> | ||
undefined; | ||
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> | ||
{utf8, Capabilities}; | ||
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> | ||
{list, [filter_capability(V) || V <- CapabilitiesList]}. | ||
|
||
filter_capability(V) when is_binary(V) -> | ||
{utf8, V}; | ||
filter_capability({T, _} = V) when is_atom(T) -> | ||
%% looks like an already tagged type, just pass it through | ||
V. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong AMQP types are used:
- Capabilities are symbols, not strings.
- Multiple values of a composite type are represented by an array, not by a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is encoded using array and symbol. See the tests https://github.com/rabbitmq/rabbitmq-server/blob/set-amqp10-capabilities/deps/amqp10_client/test/system_SUITE.erl#L935 where we send an array of symbols.
The API exposes or accepts Erlang types from users where it accepts binaries (strings) and list of binaries but the library is serliazing/deserlizing it as an array of symbols .. It is more or less how we treat Filters ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is true that multiple values are encoded using array
rather than list
.
d8f7a42
to
dd3e169
Compare
182a482
to
2563327
Compare
0c1536a
to
db6ef7b
Compare
db6ef7b
to
b98e7e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get sometimes the following error
system_SUITE > ibmmq > basic_roundtrip_ibmmq
#1. {error,
{shutdown,
{gen_statem,call,
[<0.540.0>,
{attach,
#{name => <<"banana-sender">>,filter => #{},
properties => #{},rcv_settle_mode => first,
role =>
{sender,
#{address => <<"DEV.QUEUE.3">>,
capabilities => <<"queue">>,
durable => unsettled_state}},
snd_settle_mode => settled}},
5000]}}}
when running
make -C deps/amqp10_client/ ct-system
locally on Ubuntu.
Might be worth checking this flake?
@@ -19,7 +19,7 @@ | |||
end_session/1, | |||
attach_sender_link/3, | |||
attach_sender_link/4, | |||
attach_sender_link/5, | |||
attach_sender_link/5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove all additional white spaces at the end of lines.
@@ -166,6 +166,8 @@ end_session(Pid) -> | |||
%% for the link before returning. | |||
attach_sender_link_sync(Session, Name, Target) -> | |||
attach_sender_link_sync(Session, Name, Target, mixed). | |||
-spec attach_sender_link_sync(pid(), binary(), binary()) -> | |||
{ok, link_ref()} | link_timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put function spec before the function as done in other places.
@@ -109,6 +112,7 @@ | |||
-export_type([snd_settle_mode/0, | |||
rcv_settle_mode/0, | |||
terminus_durability/0, | |||
terminus_capabilities/0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This export doesn't seem to be used outside. We should remove it.
{symbol, V}; | ||
filter_capability({T, _} = V) when is_atom(T) -> | ||
%% looks like an already tagged type, just pass it through | ||
V. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was wrongly copied from filter_value_type/1
.
A filter value has more permissive types.
For capabilities, only symbols are allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's best to only allow binaries in the list of capabilities provided by the client.
We then map it to an array of symbols here.
@@ -73,11 +73,14 @@ | |||
-type rcv_settle_mode() :: first | second. | |||
|
|||
-type terminus_durability() :: none | configuration | unsettled_state. | |||
-type terminus_capabilities() :: none | binary() | list(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none
doesn't make sense.
Looks like this was wrongly copied from terminus_durability
where none
maps to 0
(see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-terminus-durability).
If the client app doesn't want to set any capabilities, just don't set it in the target / source map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refine the list()
type further to by of type [binary(),...]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice refinement ! Thanks
after 5000 -> exit(connection_timeout) | ||
end, | ||
ct:log("Closing connection ..."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These log statements seem to be useless. Can we remove them?
ok = roundtrip(OpenConf, [{body, Data64Mb}]). | ||
|
||
basic_roundtrip_ibmmq(Config) -> | ||
application:start(sasl), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to start the Erlang sasl
application here?
Is https://www.erlang.org/doc/apps/sasl/error_logging.html being confused with https://datatracker.ietf.org/doc/html/rfc4422 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the test without this line, and the test succeeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed . In fact, it is not needed in the function basic_roundtrip
either. I removed it from there and the tests succeeds.
properties => #{} | ||
}, | ||
{ok, Sender} = amqp10_client:attach_link(Session, SenderAttachArgs), | ||
%%await_link(Sender, credited, link_credit_timeout), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete commented code.
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 4 * 60_000), | ||
ok = amqp10_client:end_session(Session), | ||
ok = amqp10_client:close_connection(Connection), | ||
|
||
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]), | ||
?assertMatch(Props, amqp10_msg:properties(OutMsg)), | ||
ActualProps = amqp10_msg:properties(OutMsg), | ||
[ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, K =/= creation_time], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The creation-time must not be changed by the broker as it's part of the properties which are part of the bare message and therefore immutable. If IBM MQ changes this creation time, this is a bug in IBM MQ. Can we maintain the assertion that creation time matches, and only omit this assertion for the buggy IBM MQ?
|
||
filtered_roundtrip(OpenConf, Args) -> | ||
Body = proplists:get_value(body, Args, <<"banana">>), | ||
Destination = proplists:get_value(destination, Args, <<"test1">>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify this again as done prior to this PR?
Here, body is always banana
and Destination is always test1
.
There is no need to use proplists:get_value/3
here.
21e5872
to
e110ecb
Compare
e110ecb
to
70839ed
Compare
Proposed Changes
It partially addresses the feature request #10752. This PR lets a developer set the capabilities to the sender and receiver links in the amqp10_client. It will not add this capability to the shovel plugin though.
The system test group
ibmmq
requires a docker image for IBM MQ server. This docker image is built by a workflow added by this other PR #11419. The official IBM MQ docker image does not come with AMQP built-in capability hence we have to build one. And given the amount of time it takes to build this image it was better to build it from its own CI workflow.NOTE for reviewer:
ibmmq
, it pulls a docker image based on amd64 arch. It does not take into account the local arch because the other possible architecture is arm64 and it is not possible to build a docker image for ARM64 with AMQP capabilities. (see https://github.com/rabbitmq/rabbitmq-server/blob/set-amqp10-capabilities/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner#L11 for more context)Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that apply