diff --git a/http/websocket.lua b/http/websocket.lua index fe53f63..6c611a4 100644 --- a/http/websocket.lua +++ b/http/websocket.lua @@ -241,7 +241,7 @@ local function read_frame(sock, deadline) if frame.length == 126 then extra_fill_unget = assert(sock:xread(2, "b", 0)) frame.length = sunpack(">I2", extra_fill_unget) - fill_length = fill_length - 2 + fill_length = fill_length - 2 + frame.length elseif frame.length == 127 then extra_fill_unget = assert(sock:xread(8, "b", 0)) frame.length = sunpack(">I8", extra_fill_unget) diff --git a/spec/websocket_spec.lua b/spec/websocket_spec.lua index d0f49e6..47befee 100644 --- a/spec/websocket_spec.lua +++ b/spec/websocket_spec.lua @@ -243,23 +243,62 @@ describe("http.websocket module two sided tests", function() assert.truthy(cq:empty()) end) local function send_receive_test(name, data, data_type) - it(name, function() - data_type = data_type or "text" - local cq = cqueues.new() - local c, s = new_pair() - cq:wrap(function() - assert(c:send(data, data_type)) - assert.same({data, data_type}, {assert(c:receive())}) - assert(c:close()) + local function do_test(bad_connection) + local bname = name + if bad_connection then + bname = bname .. ", even if the connection is really bad" + end + it(bname, function() + data_type = data_type or "text" + if bad_connection then + local real_xwrite + local fragments = 100 + local delay = 1 / fragments -- Aim for 1s. + real_xwrite = cs.interpose("xwrite", function(self, str, mode, timeout) + if mode ~= "bn" then -- Not interesting, don't throttle. + return real_xwrite(self, str, mode, timeout) + end + local deadline + if timeout then + deadline = cqueues.monotime() + timeout + end + local ok, op, why + local nbytes = math.ceil(#str / fragments) + local before_first = 0 + repeat + -- Test range at the end to ensure that real_xwrite is called at least once. + -- We rely on the fact here that :sub sanitizes the input range. + ok, op, why = real_xwrite(self, str:sub(before_first + 1, before_first + nbytes), mode, deadline and (deadline - cqueues.monotime())) + if not ok then + break + end + before_first = before_first + nbytes + cqueues.sleep(delay) + until before_first > #str + return ok, op, why + end) + finally(function() + cs.interpose("xwrite", real_xwrite) + end) + end + local cq = cqueues.new() + local c, s = new_pair() + cq:wrap(function() + assert(c:send(data, data_type)) + assert.same({data, data_type}, {assert(c:receive())}) + assert(c:close()) + end) + cq:wrap(function() + assert.same({data, data_type}, {assert(s:receive())}) + assert(s:send(data, data_type)) + assert(s:close()) + end) + assert_loop(cq, TEST_TIMEOUT) + assert.truthy(cq:empty()) end) - cq:wrap(function() - assert.same({data, data_type}, {assert(s:receive())}) - assert(s:send(data, data_type)) - assert(s:close()) - end) - assert_loop(cq, TEST_TIMEOUT) - assert.truthy(cq:empty()) - end) + end + do_test(false) + do_test(true) end send_receive_test("works with small size frames", "f") send_receive_test("works with medium size frames", ("f"):rep(200))