Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http/websocket: fix mid-size frames sometimes failing to be received #197

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http/websocket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ local function read_frame(sock, deadline)
if frame.length == 126 then
extra_fill_unget = assert(sock:xread(2, "b", 0))
frame.length = sunpack(">I2", extra_fill_unget)
fill_length = fill_length - 2
fill_length = fill_length - 2 + frame.length
elseif frame.length == 127 then
extra_fill_unget = assert(sock:xread(8, "b", 0))
frame.length = sunpack(">I8", extra_fill_unget)
Expand Down
71 changes: 55 additions & 16 deletions spec/websocket_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -243,23 +243,62 @@ describe("http.websocket module two sided tests", function()
assert.truthy(cq:empty())
end)
local function send_receive_test(name, data, data_type)
it(name, function()
data_type = data_type or "text"
local cq = cqueues.new()
local c, s = new_pair()
cq:wrap(function()
assert(c:send(data, data_type))
assert.same({data, data_type}, {assert(c:receive())})
assert(c:close())
local function do_test(bad_connection)
local bname = name
if bad_connection then
bname = bname .. ", even if the connection is really bad"
end
it(bname, function()
data_type = data_type or "text"
if bad_connection then
local real_xwrite
local fragments = 100
local delay = 1 / fragments -- Aim for 1s.
real_xwrite = cs.interpose("xwrite", function(self, str, mode, timeout)
Copy link
Owner

@daurnimator daurnimator Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to interpose the whole library: you should be able to overwrite just the specific instance:

local c, s = new_pair()
local real_xwrite = s.socket.xwrite
s.socket.xwrite = ffunction(self, str, mode, timeout)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seemed like the intended way to do it, but sure, I could do that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah never mind, I remember now why I did it like this. CQS sockets are userdata, you can't just override their methods like that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to alternatives.

if mode ~= "bn" then -- Not interesting, don't throttle.
return real_xwrite(self, str, mode, timeout)
end
local deadline
if timeout then
deadline = cqueues.monotime() + timeout
end
local ok, op, why
local nbytes = math.ceil(#str / fragments)
local before_first = 0
repeat
-- Test range at the end to ensure that real_xwrite is called at least once.
-- We rely on the fact here that :sub sanitizes the input range.
ok, op, why = real_xwrite(self, str:sub(before_first + 1, before_first + nbytes), mode, deadline and (deadline - cqueues.monotime()))
if not ok then
break
end
before_first = before_first + nbytes
cqueues.sleep(delay)
until before_first > #str
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be >=. > works because xwriting an empty string happens to be ok.

return ok, op, why
end)
finally(function()
cs.interpose("xwrite", real_xwrite)
end)
end
local cq = cqueues.new()
local c, s = new_pair()
cq:wrap(function()
assert(c:send(data, data_type))
assert.same({data, data_type}, {assert(c:receive())})
assert(c:close())
end)
cq:wrap(function()
assert.same({data, data_type}, {assert(s:receive())})
assert(s:send(data, data_type))
assert(s:close())
end)
assert_loop(cq, TEST_TIMEOUT)
assert.truthy(cq:empty())
end)
cq:wrap(function()
assert.same({data, data_type}, {assert(s:receive())})
assert(s:send(data, data_type))
assert(s:close())
end)
assert_loop(cq, TEST_TIMEOUT)
assert.truthy(cq:empty())
end)
end
do_test(false)
do_test(true)
end
send_receive_test("works with small size frames", "f")
send_receive_test("works with medium size frames", ("f"):rep(200))
Expand Down