Channel
概述
Channel 是协程之间传递消息的主要方式,提供类型安全的消息传递机制。ZIO 的 Channel 类似于 Go 语言的 channel,但针对 Zig 的内存模型进行了优化。
实现
Channel实际是一个泛型,其内部实现为ChannelImpl
const ChannelImpl = struct {
buffer: [*]u8, // 环形缓冲区
elem_size: usize, // 元素大小
capacity: usize, // 缓冲区容量
head: usize = 0, // 读取位置
tail: usize = 0, // 写入位置
count: usize = 0, // 当前元素数
mutex: Mutex = .init, // 保护内部状态
receiver_queue: SimpleQueue(WaitNode) = .empty, // 等待的接收者
sender_queue: SimpleQueue(WaitNode) = .empty, // 等待的发送者
closed: bool = false, // 关闭标志
};
channel在初始化时接收一个T类型的切片,它会直接转换为字节切片,然后赋值给buffer,elem_size会记录T类型的大小,capacity则记录了字节切片的大小。channel是一个环形缓冲区,通过写指针tail和读取指针head进行管理,count是T类型元素个数,其等于(tail-head)/elem_size。当buffer还有空闲容量时,接收或者发送都会先从buffer获取结果。
而接收者和发送者实际上是等待队列,当无法写入元素到buffer时就会进行阻塞,并将等待者放入对应的队列中。
而Channel实现异步等待主要依靠Waiter结构,Waiter是用于异步操作的栈分配等待器,它支持两种模式:
-
direct:用于单个future的等待,拥有任务的所有权并发送通知
-
select:用于多个future的select,指向一个父级directr等待器
基本使用样例
var waiter = Waiter.init();
future.asyncWait(&waiter);
try waiter.wait(1, .allow_cancel);
等待者的wait函数相当于切换协程,而通知恢复协程使用signal函数
实际channel receive处理逻辑:
-
判断缓冲区是否有元素,如果有直接从缓冲区获取,并且如果发送者队列有等待者,那么将这个等待者数据放入缓冲区并提醒等待者恢复执行逻辑
-
如果缓冲区元素为空,而发送者队列非空,则直接从发送者队列获取等待者,获取数据并通知协程恢复
-
加入接收者等待队列
实际channel send处理逻辑:
-
如果接收者队列非空,直接将数据给接收者队列的等待者,并通知恢复协程
-
如果缓冲区还有剩余空间,则将数据放入缓冲区
-
加入发送者等待队列
BroadcastChannel
Channel实际上是多生产者单消费者(Multi-Producer Single-Consumer)通道,而BoardcastChannel是广播通道,一条消息发送给所有订阅者。
var broadcast = zio.BroadcastChannel(Event).init(allocator);
defer broadcast.deinit(allocator);
// 订阅者
var sub1 = broadcast.subscribe();
defer sub1.deinit();
var sub2 = broadcast.subscribe();
defer sub2.deinit();
// 发布消息
try broadcast.publish(.{ .type = .update });
// 所有订阅者都会收到
const e1 = sub1.receive();
const e2 = sub2.receive();
基本使用
接收发送
接收发送分为阻塞和非阻塞两种方式,阻塞方式receive和send,非阻塞方式tryReceive和trySend,非阻塞方式如果无法发送或接收,会立即返回错误ChannelClosed、ChannelFull或ChannelClosed。
示例代码
var channel: zio.Channel(Message) = .init;
fn producer(channel: *zio.Channel(Message)) void {
var i: usize = 0;
while (i < 100) : (i += 1) {
const msg = Message{ .id = i, .data = generateData() };
channel.send(msg) catch |err| {
std.log.err("Send failed: {}", .{err});
return;
};
}
}
fn trySendExample(channel: *zio.Channel(Message)) void {
const msg = Message{ .id = 1 };
if (channel.trySend(msg)) {
std.log.info("Sent successfully", .{});
} else |err| switch (err) {
error.ChannelFull => {
std.log.warn("Channel full, will retry later", .{});
// 可以做其他事情再重试
},
error.ChannelClosed => {
std.log.err("Channel closed", .{});
},
}
}
fn consumer(channel: *zio.Channel(Message)) void {
while (true) {
const msg = channel.receive() catch |err| {
if (err == error.ChannelClosed) {
std.log.info("Channel closed, exiting", .{});
break;
}
std.log.err("Receive failed: {}", .{err});
continue;
};
processMessage(msg);
}
}
fn tryReceiveExample(channel: *zio.Channel(Message)) void {
if (channel.tryReceive()) |msg| {
processMessage(msg);
} else |err| switch (err) {
error.ChannelEmpty => {
// 没有消息,做其他事
doOtherWork();
},
error.ChannelClosed => {
// 通道已关闭
},
}
}
关闭处理
channel支持两种关闭模式
pub const CloseMode = enum {
/// 优雅关闭 - 允许接收者读取缓冲的消息
graceful,
/// 立即关闭 - 清空缓冲,接收者立即收到错误
immediate,
};
如果对channel直接调用close,相当于设置graceful。如果要立即关闭,需要执行
channel.closeWithMode(.immediate);
生产者消费者模式
基本模式
const std = @import("std");
const zio = @import("zio");
const Task = struct {
id: usize,
data: []const u8,
};
fn producer(id: usize, channel: *zio.Channel(Task)) void {
var i: usize = 0;
while (i < 100) : (i += 1) {
const task = Task{
.id = id * 1000 + i,
.data = generateData(),
};
channel.send(task) catch {
std.log.info("Producer {}: channel closed", .{id});
return;
};
}
}
fn consumer(id: usize, channel: *zio.Channel(Task)) void {
while (true) {
const task = channel.receive() catch {
std.log.info("Consumer {}: channel closed", .{id});
return;
};
processTask(task);
}
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const rt = try zio.Runtime.init(gpa.allocator(), .{});
defer rt.deinit();
var channel = zio.Channel(Task).initWithCapacity(gpa.allocator(), 100);
defer channel.deinit(gpa.allocator());
var group: zio.Group = .init;
defer group.cancel();
// 多个生产者
for (0..3) |id| {
try group.spawn(producer, .{ id, &channel });
}
// 多个消费者
for (0..2) |id| {
try group.spawn(consumer, .{ id, &channel });
}
}
工作窃取模式
fn worker(id: usize, channels: []zio.Channel(Task)) void {
while (true) {
// 先检查自己的通道
if (channels[id].tryReceive()) |task| {
process(task);
continue;
} else |_| {}
// 尝试从其他通道窃取
var stolen = false;
for (channels, 0..) |*ch, i| {
if (i == id) continue;
if (ch.tryReceive()) |task| {
process(task);
stolen = true;
break;
} else |_| {}
}
if (!stolen) {
// 没有工作,等待
zio.sleep(.fromMillis(10));
}
}
}
优先级队列
const Priority = enum { high, normal, low };
const PriorityTask = struct {
priority: Priority,
task: Task,
};
fn priorityConsumer(channel: *zio.Channel(PriorityTask)) void {
// 简化的优先级处理
// 实际实现可能需要多个通道或优先级队列
while (true) {
const item = channel.receive() catch break;
switch (item.priority) {
.high => processImmediately(item.task),
.normal => processNormal(item.task),
.low => processWhenIdle(item.task),
}
}
}
Select组合
多通道选择
fn selectFromMultipleChannels(
ch1: *zio.Channel(Message),
ch2: *zio.Channel(Message),
timeout: zio.Timeout,
) void {
while (true) {
const result = zio.select(.{
ch1.asyncWaitReceive(),
ch2.asyncWaitReceive(),
timeout.asyncWait(),
});
switch (result) {
.item_0 => |msg| processFromCh1(msg),
.item_1 => |msg| processFromCh2(msg),
.item_2 => {
std.log.info("Timeout", .{});
return;
},
}
}
}
超时发送
fn sendWithTimeout(
channel: *zio.Channel(Message),
msg: Message,
duration: zio.Duration,
) !void {
var timeout = zio.Timeout.fromDuration(duration);
const result = zio.select(.{
channel.asyncWaitSend(msg),
timeout.asyncWait(),
});
switch (result) {
.item_0 => return, // 发送成功
.item_1 => return error.Timeout,
}
}
Last updated today
Built with Documentation.AI