Use common network runtime for telemetry messages#1796
Use common network runtime for telemetry messages#1796mrdimidium wants to merge 6 commits intomainfrom
Conversation
src/telemetry/lightpanda.zig
Outdated
| pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { | ||
| const event = try self.mem_pool.create(); | ||
| event.* = .{ | ||
| pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { |
There was a problem hiding this comment.
send is what the rest of the code ends up calling. On main, all this does is create an event, queues it and signals the worker thread.
In this version, once the buffer is full, it serializes json, has to acquire 3 mutexes and writes to the pipe. Quite a bit more that can slow down a caller.
There was a problem hiding this comment.
Yeah. I think a better solution is setInterval, which is called from main and flush on main thread, regardless of when the events are posted. I simplify it because serializing 20 events seems cheap. But if you worries let me fix it.
There was a problem hiding this comment.
It's not so much the JSON serialization (the original version retained the allocation for the writer/buffer). It's just all those extra opaque network things that could block, either now or in the future. Very easy to change, say, getConnection and not realize "oh, this will impact all callers of telemetry).
There was a problem hiding this comment.
The last commit implement interval flush on the main thread. Now the c dp thread only acquires a mutex and stores the event in the buffer.
src/telemetry/lightpanda.zig
Outdated
| self.mutex.lock(); | ||
| defer self.mutex.unlock(); | ||
|
|
||
| self.pending[self.pcount] = .{ |
There was a problem hiding this comment.
Pretty sure there's a window where this can overflow.
src/telemetry/lightpanda.zig
Outdated
| return into[0..i]; | ||
| } | ||
| }; | ||
| const conn = self.runtime.getConnection() orelse return; |
There was a problem hiding this comment.
If there's no connection available, we lose the data?
There was a problem hiding this comment.
The event buffer is twice the batch size, so we'll keep trying to send events. But when the buffer runs out, yes, we'll start losing events.
There was a problem hiding this comment.
I don't think that's right? pfcount is reset to 0 a few lines above, so if there's no connection, there's no retry for those events.
There was a problem hiding this comment.
You're right, it's a regression. Connection should be taken first.
src/network/Runtime.zig
Outdated
| if (self.pollfds[1].revents == 0) continue; | ||
| self.pollfds[1].revents = 0; | ||
| // accept new connections | ||
| if (self.pollfds[1].revents != 0) { |
There was a problem hiding this comment.
Minor, but I think self.pollfds[0] and self.pollfds[1] can be stored in a local outside the loop.
There was a problem hiding this comment.
I'm not sure I understand. They're set in init/bind, but revents are changed from poll at each iteration. Or am I misunderstanding the idea?
There was a problem hiding this comment.
const poll_fd = &self.pollfds[0];
const listen_fd = &self.pollfds[1];
while (true) {
self.drainQueue();I guess without bound-checking in ReleaseMode, it isn't a huge win.
src/network/Runtime.zig
Outdated
| } | ||
|
|
||
| listener.onAccept(listener.ctx, socket); | ||
| if (self.shutdown.load(.acquire) and running_handles == 0) |
There was a problem hiding this comment.
So if we're shutting down but there are handles, we'll continue to process the handles? Is that right?
There was a problem hiding this comment.
The idea is that we should try to send buffered telemetry events when we receive sigterm, but we shouldn't accept new connections.
There was a problem hiding this comment.
Hmm..that makes sense. It deserves a comment. But it would also "hang" the shutdown until any other network activity completes, right? Not just telemetry. Like if I have a website that downloads a script, in our donecallback, we execute it, it could download another script and repeat forever. That would block the shutdown?
There was a problem hiding this comment.
I plan to stop the cdp threads when we receive a sigterm, and they'll explicitly cancel their requests.
SigHandler has logic for resuscitation, so if the user presses ctrlc again, we'll kill the process. But yes, it could potentially hang. We can add a timeout to the sighandler to kill everyone after a minute.
|
|
||
| // If we were woken up, perhaps everything was cancelled and the iteration can be completed. | ||
| if (self.shutdown.load(.acquire)) break; | ||
| while (true) |
There was a problem hiding this comment.
I'm not sure, but doesn't checking for shutdown when self.pollfds[0].revents != 0 make the most sense? You check it on acceptConnection,once, and at the end of the loop, but if it's true, it'll be true here first. The other two checks don't seem necessary if you check here, and I think it better documents the flow, e.g. pollfds[0] is used to signal shutdown.
There was a problem hiding this comment.
I'm not sure I understand the ide. We don't terminate the loop when we receive a shutdown because we're trying to send scheduled telemetry but are no longer accepting new connections.
There was a problem hiding this comment.
Yes, now that I understand that the shutdown doesn't want to shutdown right away, I understand why it's how it is. Hence why I think this lines needs a comment:
if (self.shutdown.load(.acquire) and running_handles == 0)
|
Also, can we clean up the names. I know I'm guilty of this too, but as much as possible I'd like namespaces to match the file names. Rename rename the imports to There's also 1 place where the http.zig is imported as Net: |
9165e9b to
b4d92d2
Compare
|
I haven't done a full re-review, because I think the design needs more work. It just has too much potential to lose messages. If I run In the first iteration, a call to (The flush on deinit also has me a little uneasy) |
b4d92d2 to
c7edd99
Compare
src/telemetry/lightpanda.zig
Outdated
| }; | ||
| } | ||
| // const URL = "https://telemetry.lightpanda.io"; | ||
| const URL = "http://localhost:9876"; |
src/telemetry/lightpanda.zig
Outdated
| } | ||
| }; | ||
|
|
||
| const conn = self.runtime.getConnection() orelse { |
There was a problem hiding this comment.
Makes sense to make sure we can get a connection before serializing? Not sure, your call.
| pub const Event = union(enum) { | ||
| run: void, | ||
| navigate: Navigate, | ||
| buffer_overflow: BufferOverflow, |
There was a problem hiding this comment.
@krichprollsch do we need to change anything to accept a new telemetry type?
There was a problem hiding this comment.
Yes I have to update the telemetry side. But we can merge,we will just loose these specific events in the meantime.
|
|
||
| const iid: ?[]const u8 = if (self.iid) |*id| id else null; | ||
|
|
||
| for (h..t) |i| { |
There was a problem hiding this comment.
server has a max body size of 500KB (last I heard), so we might need to limit this to ~50 ?
There was a problem hiding this comment.
There was a problem hiding this comment.
Different events have different sizes, it's easier to explicitly write events to the writer until we fit within 500 KB.
There was a problem hiding this comment.
The limit in the server side can be changed, 500KB is conservative. we can safely go higher.
| self.fireTicks(); | ||
|
|
||
| listener.onAccept(listener.ctx, socket); | ||
| if (self.shutdown.load(.acquire) and running_handles == 0) { |
There was a problem hiding this comment.
When I tested it, the batch size was always 1, which is good because it's on the main thread, but a bit less efficient in other ways (including on the server, cc @krichprollsch). But, when I hit ctrl-c, the program takes a long time to exit. It just keeps processing CDP requests, and it keeps sending batches of 1. It took maybe 1 minute to properly shut down.
Is there away to improve this? killing a process but having it run for 1 minute firing off telemetry isn't great. Do we have any way to disconnect all other clients, and synchronously flush the telemetry?
Adds support for libcurl requests to the network runtime. The first consumer is telemetry, which now serves as a thin event broker. Since Runtime doesn't support timeouts/intervals, events are reset either upon receiving an event or upon server termination.