Skip to content

Commit f0e8172

Browse files
committed
ok
1 parent e1de756 commit f0e8172

File tree

6 files changed

+341
-69
lines changed

6 files changed

+341
-69
lines changed

src/bun.js/event_loop.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub fn pipeReadBuffer(this: *const EventLoop) []u8 {
9898
}
9999

100100
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
101-
const log = bun.Output.scoped(.EventLoop, .visible);
101+
const log = bun.Output.scoped(.EventLoop, .hidden);
102102

103103
pub fn tickWhilePaused(this: *EventLoop, done: *bool) void {
104104
while (!done.*) {

src/bun.js/webcore/fetch.zig

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub const FetchTasklet = struct {
108108
// custom checkServerIdentity
109109
check_server_identity: jsc.Strong.Optional = .empty,
110110
reject_unauthorized: bool = true,
111+
is_websocket_upgrade: bool = false,
111112
// Custom Hostname
112113
hostname: ?[]u8 = null,
113114
is_waiting_body: bool = false,
@@ -1069,6 +1070,7 @@ pub const FetchTasklet = struct {
10691070
.memory_reporter = fetch_options.memory_reporter,
10701071
.check_server_identity = fetch_options.check_server_identity,
10711072
.reject_unauthorized = fetch_options.reject_unauthorized,
1073+
.is_websocket_upgrade = fetch_options.is_websocket_upgrade,
10721074
};
10731075

10741076
fetch_tasklet.signals = fetch_tasklet.signal_store.to();
@@ -1201,19 +1203,23 @@ pub const FetchTasklet = struct {
12011203
// dont have backpressure so we will schedule the data to be written
12021204
// if we have backpressure the onWritable will drain the buffer
12031205
needs_schedule = stream_buffer.isEmpty();
1204-
//16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n
1205-
var formated_size_buffer: [18]u8 = undefined;
1206-
const formated_size = std.fmt.bufPrint(
1207-
formated_size_buffer[0..],
1208-
"{x}\r\n",
1209-
.{data.len},
1210-
) catch |err| switch (err) {
1211-
error.NoSpaceLeft => unreachable,
1212-
};
1213-
bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2));
1214-
stream_buffer.writeAssumeCapacity(formated_size);
1215-
stream_buffer.writeAssumeCapacity(data);
1216-
stream_buffer.writeAssumeCapacity("\r\n");
1206+
if (this.is_websocket_upgrade) {
1207+
bun.handleOom(stream_buffer.write(data));
1208+
} else {
1209+
//16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n
1210+
var formated_size_buffer: [18]u8 = undefined;
1211+
const formated_size = std.fmt.bufPrint(
1212+
formated_size_buffer[0..],
1213+
"{x}\r\n",
1214+
.{data.len},
1215+
) catch |err| switch (err) {
1216+
error.NoSpaceLeft => unreachable,
1217+
};
1218+
bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2));
1219+
stream_buffer.writeAssumeCapacity(formated_size);
1220+
stream_buffer.writeAssumeCapacity(data);
1221+
stream_buffer.writeAssumeCapacity("\r\n");
1222+
}
12171223

12181224
// pause the stream if we hit the high water mark
12191225
return stream_buffer.size() >= highWaterMark;
@@ -1271,6 +1277,7 @@ pub const FetchTasklet = struct {
12711277
check_server_identity: jsc.Strong.Optional = .empty,
12721278
unix_socket_path: ZigString.Slice,
12731279
ssl_config: ?*SSLConfig = null,
1280+
is_websocket_upgrade: bool = false,
12741281
};
12751282

12761283
pub fn queue(
@@ -1494,6 +1501,7 @@ pub fn Bun__fetch_(
14941501
var memory_reporter = bun.handleOom(bun.default_allocator.create(bun.MemoryReportingAllocator));
14951502
// used to clean up dynamically allocated memory on error (a poor man's errdefer)
14961503
var is_error = false;
1504+
var is_websocket_upgrade = false;
14971505
var allocator = memory_reporter.wrap(bun.default_allocator);
14981506
errdefer bun.default_allocator.destroy(memory_reporter);
14991507
defer {
@@ -2198,6 +2206,15 @@ pub fn Bun__fetch_(
21982206
}
21992207
}
22002208

2209+
if (headers_.fastGet(bun.webcore.FetchHeaders.HTTPHeaderName.Upgrade)) |_upgrade| {
2210+
const upgrade = _upgrade.toSlice(bun.default_allocator);
2211+
defer upgrade.deinit();
2212+
const slice = upgrade.slice();
2213+
if (bun.strings.eqlComptime(slice, "websocket")) {
2214+
is_websocket_upgrade = true;
2215+
}
2216+
}
2217+
22012218
break :extract_headers Headers.from(headers_, allocator, .{ .body = body.getAnyBlob() }) catch |err| bun.handleOom(err);
22022219
}
22032220

@@ -2333,7 +2350,7 @@ pub fn Bun__fetch_(
23332350
}
23342351
}
23352352

2336-
if (!method.hasRequestBody() and body.hasBody()) {
2353+
if (!method.hasRequestBody() and body.hasBody() and !is_websocket_upgrade) {
23372354
const err = globalThis.toTypeError(.INVALID_ARG_VALUE, fetch_error_unexpected_body, .{});
23382355
is_error = true;
23392356
return JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err);
@@ -2651,6 +2668,7 @@ pub fn Bun__fetch_(
26512668
.ssl_config = ssl_config,
26522669
.hostname = hostname,
26532670
.memory_reporter = memory_reporter,
2671+
.is_websocket_upgrade = is_websocket_upgrade,
26542672
.check_server_identity = if (check_server_identity.isEmptyOrUndefinedOrNull()) .empty else .create(check_server_identity, globalThis),
26552673
.unix_socket_path = unix_socket_path,
26562674
},

src/http.zig

Lines changed: 85 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,9 @@ pub const Flags = packed struct(u16) {
405405
is_preconnect_only: bool = false,
406406
is_streaming_request_body: bool = false,
407407
defer_fail_until_connecting_is_complete: bool = false,
408-
_padding: u5 = 0,
408+
is_websockets: bool = false,
409+
websocket_upgraded: bool = false,
410+
_padding: u3 = 0,
409411
};
410412

411413
// TODO: reduce the size of this struct
@@ -592,6 +594,11 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
592594
hashHeaderConst("Accept-Encoding") => {
593595
override_accept_encoding = true;
594596
},
597+
hashHeaderConst("Upgrade") => {
598+
if (std.ascii.eqlIgnoreCase(this.headerStr(header_values[i]), "websocket")) {
599+
this.flags.is_websockets = true;
600+
}
601+
},
595602
hashHeaderConst(chunked_encoded_header.name) => {
596603
// We don't want to override chunked encoding header if it was set by the user
597604
add_transfer_encoding = false;
@@ -1019,11 +1026,14 @@ fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: Ne
10191026
// no data to send so we are done
10201027
return false;
10211028
}
1022-
10231029
pub fn writeToStream(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, data: []const u8) void {
10241030
log("flushStream", .{});
10251031
var stream = &this.state.original_request_body.stream;
10261032
const stream_buffer = stream.buffer orelse return;
1033+
if (this.flags.is_websockets and !this.flags.websocket_upgraded) {
1034+
// cannot drain yet, websocket is waiting for upgrade
1035+
return;
1036+
}
10271037
const buffer = stream_buffer.acquire();
10281038
const wasEmpty = buffer.isEmpty() and data.len == 0;
10291039
if (wasEmpty and stream.ended) {
@@ -1324,56 +1334,78 @@ pub fn handleOnDataHeaders(
13241334
) void {
13251335
log("handleOnDataHeaders", .{});
13261336
var to_read = incoming_data;
1327-
var amount_read: usize = 0;
1328-
var needs_move = true;
1329-
if (this.state.response_message_buffer.list.items.len > 0) {
1330-
// this one probably won't be another chunk, so we use appendSliceExact() to avoid over-allocating
1331-
bun.handleOom(this.state.response_message_buffer.appendSliceExact(incoming_data));
1332-
to_read = this.state.response_message_buffer.list.items;
1333-
needs_move = false;
1334-
}
1335-
1336-
// we reset the pending_response each time wich means that on parse error this will be always be empty
1337-
this.state.pending_response = picohttp.Response{};
1338-
1339-
// minimal http/1.1 request size is 16 bytes without headers and 26 with Host header
1340-
// if is less than 16 will always be a ShortRead
1341-
if (to_read.len < 16) {
1342-
log("handleShortRead", .{});
1343-
this.handleShortRead(is_ssl, incoming_data, socket, needs_move);
1344-
return;
1345-
}
13461337

1347-
var response = picohttp.Response.parseParts(
1348-
to_read,
1349-
&shared_response_headers_buf,
1350-
&amount_read,
1351-
) catch |err| {
1352-
switch (err) {
1353-
error.ShortRead => {
1354-
this.handleShortRead(is_ssl, incoming_data, socket, needs_move);
1355-
},
1356-
else => {
1357-
this.closeAndFail(err, is_ssl, socket);
1358-
},
1338+
while (true) {
1339+
var amount_read: usize = 0;
1340+
var needs_move = true;
1341+
if (this.state.response_message_buffer.list.items.len > 0) {
1342+
// this one probably won't be another chunk, so we use appendSliceExact() to avoid over-allocating
1343+
bun.handleOom(this.state.response_message_buffer.appendSliceExact(incoming_data));
1344+
to_read = this.state.response_message_buffer.list.items;
1345+
needs_move = false;
13591346
}
1360-
return;
1361-
};
13621347

1363-
// we save the successful parsed response
1364-
this.state.pending_response = response;
1365-
1366-
const body_buf = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..];
1367-
// handle the case where we have a 100 Continue
1368-
if (response.status_code >= 100 and response.status_code < 200) {
1369-
log("information headers", .{});
1370-
// we still can have the 200 OK in the same buffer sometimes
1371-
if (body_buf.len > 0) {
1372-
log("information headers with body", .{});
1373-
this.onData(is_ssl, body_buf, ctx, socket);
1348+
// we reset the pending_response each time wich means that on parse error this will be always be empty
1349+
this.state.pending_response = picohttp.Response{};
1350+
1351+
// minimal http/1.1 request size is 16 bytes without headers and 26 with Host header
1352+
// if is less than 16 will always be a ShortRead
1353+
if (to_read.len < 16) {
1354+
log("handleShortRead", .{});
1355+
this.handleShortRead(is_ssl, incoming_data, socket, needs_move);
1356+
return;
13741357
}
1375-
return;
1358+
1359+
const response = picohttp.Response.parseParts(
1360+
to_read,
1361+
&shared_response_headers_buf,
1362+
&amount_read,
1363+
) catch |err| {
1364+
switch (err) {
1365+
error.ShortRead => {
1366+
this.handleShortRead(is_ssl, incoming_data, socket, needs_move);
1367+
},
1368+
else => {
1369+
this.closeAndFail(err, is_ssl, socket);
1370+
},
1371+
}
1372+
return;
1373+
};
1374+
1375+
// we save the successful parsed response
1376+
this.state.pending_response = response;
1377+
1378+
to_read = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..];
1379+
1380+
if (response.status_code == 101) {
1381+
if (!this.flags.is_websockets) {
1382+
// we cannot upgrade to websocket because the client did not request it!
1383+
this.closeAndFail(error.UnrequestedUpgrade, is_ssl, socket);
1384+
return;
1385+
}
1386+
// special case for websocket upgrade
1387+
this.flags.is_websockets = true;
1388+
this.flags.websocket_upgraded = true;
1389+
if (this.signals.upgraded) |upgraded| {
1390+
upgraded.store(true, .monotonic);
1391+
}
1392+
// start draining the request body
1393+
this.flushStream(is_ssl, socket);
1394+
break;
1395+
}
1396+
1397+
// handle the case where we have a 100 Continue
1398+
if (response.status_code >= 100 and response.status_code < 200) {
1399+
log("information headers", .{});
1400+
// we still can have the 200 OK in the same buffer sometimes
1401+
// 1XX responses MUST NOT include a message-body, therefore we need to continue parsing
1402+
1403+
continue;
1404+
}
1405+
1406+
break;
13761407
}
1408+
var response = this.state.pending_response.?;
13771409
const should_continue = this.handleResponseMetadata(
13781410
&response,
13791411
) catch |err| {
@@ -1409,14 +1441,14 @@ pub fn handleOnDataHeaders(
14091441

14101442
if (this.flags.proxy_tunneling and this.proxy_tunnel == null) {
14111443
// we are proxing we dont need to cloneMetadata yet
1412-
this.startProxyHandshake(is_ssl, socket, body_buf);
1444+
this.startProxyHandshake(is_ssl, socket, to_read);
14131445
return;
14141446
}
14151447

14161448
// we have body data incoming so we clone metadata and keep going
14171449
this.cloneMetadata();
14181450

1419-
if (body_buf.len == 0) {
1451+
if (to_read.len == 0) {
14201452
// no body data yet, but we can report the headers
14211453
if (this.signals.get(.header_progress)) {
14221454
this.progressUpdate(is_ssl, ctx, socket);
@@ -1426,7 +1458,7 @@ pub fn handleOnDataHeaders(
14261458

14271459
if (this.state.response_stage == .body) {
14281460
{
1429-
const report_progress = this.handleResponseBody(body_buf, true) catch |err| {
1461+
const report_progress = this.handleResponseBody(to_read, true) catch |err| {
14301462
this.closeAndFail(err, is_ssl, socket);
14311463
return;
14321464
};
@@ -1439,7 +1471,7 @@ pub fn handleOnDataHeaders(
14391471
} else if (this.state.response_stage == .body_chunk) {
14401472
this.setTimeout(socket, 5);
14411473
{
1442-
const report_progress = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| {
1474+
const report_progress = this.handleResponseBodyChunkedEncoding(to_read) catch |err| {
14431475
this.closeAndFail(err, is_ssl, socket);
14441476
return;
14451477
};
@@ -2149,7 +2181,7 @@ pub fn handleResponseMetadata(
21492181
// [...] cannot contain a message body or trailer section.
21502182
// therefore in these cases set content-length to 0, so the response body is always ignored
21512183
// and is not waited for (which could cause a timeout)
2152-
if ((response.status_code >= 100 and response.status_code < 200) or response.status_code == 204 or response.status_code == 304) {
2184+
if ((response.status_code >= 100 and response.status_code < 200 and response.status_code != 101) or response.status_code == 204 or response.status_code == 304) {
21532185
this.state.content_length = 0;
21542186
}
21552187

@@ -2416,7 +2448,7 @@ pub fn handleResponseMetadata(
24162448
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
24172449
}
24182450

2419-
if (this.method.hasBody() and (content_length == null or content_length.? > 0 or !this.state.flags.allow_keepalive or this.state.transfer_encoding == .chunked or is_server_sent_events)) {
2451+
if (this.method.hasBody() and (content_length == null or content_length.? > 0 or !this.state.flags.allow_keepalive or this.state.transfer_encoding == .chunked or is_server_sent_events or this.flags.websocket_upgraded)) {
24202452
return ShouldContinue.continue_streaming;
24212453
} else {
24222454
return ShouldContinue.finished;

src/http/Signals.zig

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,24 @@ header_progress: ?*std.atomic.Value(bool) = null,
44
body_streaming: ?*std.atomic.Value(bool) = null,
55
aborted: ?*std.atomic.Value(bool) = null,
66
cert_errors: ?*std.atomic.Value(bool) = null,
7+
upgraded: ?*std.atomic.Value(bool) = null,
78
pub fn isEmpty(this: *const Signals) bool {
8-
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
9+
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null and this.upgraded == null;
910
}
1011

1112
pub const Store = struct {
1213
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
1314
body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
1415
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
1516
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
17+
upgraded: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
1618
pub fn to(this: *Store) Signals {
1719
return .{
1820
.header_progress = &this.header_progress,
1921
.body_streaming = &this.body_streaming,
2022
.aborted = &this.aborted,
2123
.cert_errors = &this.cert_errors,
24+
.upgraded = &this.upgraded,
2225
};
2326
}
2427
};

0 commit comments

Comments
 (0)