Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http/websocket: fix mid-size frames sometimes failing to be received #197

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

LBPHacker
Copy link

@LBPHacker LBPHacker commented Jan 17, 2022

This is more an educated guess as to why #140 exists combined with some pattern matching, than a well-researched, proof-backed solution. But it does fix a service I maintain that otherwise exhibits the problem described in the aforementioned issue.

Edit: I'm not sure what sort of test one would add for something like this, request for comments on that.

To verify that this indeed fixes what I think* is the problem, I first severely handicapped my lo:

ifconfig lo mtu 1500
tc qdisc change dev lo root netem rate 16kbit

And ran the following programs (code below):

./server.lua &
time ./client.lua
server.lua
#!/usr/bin/env lua5.3

local http_websocket = require("http.websocket")
local http_server    = require("http.server")
local http_headers   = require("http.headers")
local cqueues        = require("cqueues")

local function reply(myserver, stream)
	local req_headers = assert(stream:get_headers())
	local ws = assert(http_websocket.new_from_stream(stream, req_headers))
	ws:accept()
	while true do
		local data, typ = ws:receive()
		if not data then
			break
		end
		ws:send(data, typ)
	end
	ws:close(1000)
end

local myserver = assert(http_server.listen({
	host = "0.0.0.0",
	port = 36779,
	onstream = reply,
	onerror = function(myserver, context, op, err, errno)
		local msg = op .. " on " .. tostring(context) .. " failed"
		if err then
			msg = msg .. ": " .. tostring(err)
		end
		assert(io.stderr:write(msg, "\n"))
	end,
}))

assert(myserver:listen())
do
	local bound_port = select(3, myserver:localname())
	assert(io.stderr:write(string.format("Now listening on port %d\n", bound_port)))
end
assert(myserver:loop())
client.lua
#!/usr/bin/env lua5.3

local http_websocket = require("http.websocket")
local cqueues        = require("cqueues")
local errno          = require("cqueues.errno")

local large_str = ("z"):rep(10000)
local client = http_websocket.new_from_uri('ws://localhost:36779')
assert(client:connect())
client:send(large_str, "text")
local data, typ
while true do
	data, typ, errcode = client:receive(1)
	if data then
		break
	end
	if errcode ~= errno.ETIMEDOUT then
		error(typ)
	end
end
assert(#data == #large_str)
client:close(1000)

Without the fix applied, I get

onstream on http.h1_stream{connection=http.h1_connection{type="server";version=1.1};state="closed"} failed: /usr/share/lua/5.3/http/websocket.lua:282: read: Connection timed out

and the client never terminates. With the fix applied, I get

time ./client.lua

real	0m14.598s
user	0m0.023s
sys	0m0.004s

*: I see nothing else in the code that might cause the assertion to fail, but I'm unfamiliar with both the websocket RFCs and this codebase, so there may be things I'm missing.

@daurnimator
Copy link
Owner

Thankyou for investingating! This indeed looks like it could be the issue.

Are you able to create a test case: working backwards from the changed code, we should be able to reach+test this code by sending a frame of size >125 but less than 65536.

@LBPHacker
Copy link
Author

I'll try my best, but as I said, I'm unsure what sort of test one would add for testing this, as the conditions don't seem easily reproducible from Lua. I did at some point try interposing socket.xwrite to introduce artificial throttling and fragmentation, but it didn't end well.

@daurnimator
Copy link
Owner

'll try my best, but as I said, I'm unsure what sort of test one would add for testing this, as the conditions don't seem easily reproducible from Lua.

I think it would look similar to

it(name, function()
data_type = data_type or "text"
local cq = cqueues.new()
local c, s = new_pair()
cq:wrap(function()
assert(c:send(data, data_type))
assert.same({data, data_type}, {assert(c:receive())})
assert(c:close())
end)
cq:wrap(function()
assert.same({data, data_type}, {assert(s:receive())})
assert(s:send(data, data_type))
assert(s:close())
end)
assert_loop(cq, TEST_TIMEOUT)
assert.truthy(cq:empty())
end)
except without the :close() on the writing side until after the receive side has completed.

@daurnimator
Copy link
Owner

Oh I see: fill is eager and if you only have half the packet then the timeout of 0 is used.

Okay yeah, we need some way to only send half the message "now", half after a delay. Or to cap :fill.... thinking....

@LBPHacker
Copy link
Author

I'm pretty close to getting something working, will post in a bit.

@LBPHacker LBPHacker force-pushed the fix-websocket-midsize-read-fail branch from 217300f to 2513b20 Compare January 17, 2022 16:08
@LBPHacker
Copy link
Author

LBPHacker commented Jan 17, 2022

Got it. The test http.websocket module two sided tests works with medium size frames, even if the connection is really bad fails without the fix, with the error we know from and love in #140.

local real_xwrite
local fragments = 100
local delay = 1 / fragments -- Aim for 1s.
real_xwrite = cs.interpose("xwrite", function(self, str, mode, timeout)
Copy link
Owner

@daurnimator daurnimator Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to interpose the whole library: you should be able to overwrite just the specific instance:

local c, s = new_pair()
local real_xwrite = s.socket.xwrite
s.socket.xwrite = ffunction(self, str, mode, timeout)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seemed like the intended way to do it, but sure, I could do that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah never mind, I remember now why I did it like this. CQS sockets are userdata, you can't just override their methods like that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to alternatives.

end
before_first = before_first + nbytes
cqueues.sleep(delay)
until before_first > #str
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be >=. > works because xwriting an empty string happens to be ok.

@LBPHacker
Copy link
Author

Are we in a deadlock here? I'm waiting for a response on the interposition thing, maybe you're also waiting for something?

@LBPHacker
Copy link
Author

60-day poke.

@daurnimator
Copy link
Owner

Are we in a deadlock here? I'm waiting for a response on the interposition thing, maybe you're also waiting for something?

Sorry, I'm low on free time lately.
I don't love the approach taken for the tests here, and my plan is to sit down one night and try to re-work it.
Maybe I won't come up with anything better than what you already have; or maybe it will lead to a larger refactor/change.

@LBPHacker
Copy link
Author

Alright, noted. Excuse the poking, I was just trying to make sure I hadn't derailed some sort of 'workflow' by leaving a review comment or something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants