logo
ZioChannel

Channel

概述

Channel 是协程之间传递消息的主要方式,提供类型安全的消息传递机制。ZIO 的 Channel 类似于 Go 语言的 channel,但针对 Zig 的内存模型进行了优化。

实现

Channel实际是一个泛型,其内部实现为ChannelImpl

const ChannelImpl = struct {
    buffer: [*]u8,           // 环形缓冲区
    elem_size: usize,        // 元素大小
    capacity: usize,         // 缓冲区容量
    
    head: usize = 0,         // 读取位置
    tail: usize = 0,         // 写入位置
    count: usize = 0,        // 当前元素数
    
    mutex: Mutex = .init,    // 保护内部状态
    receiver_queue: SimpleQueue(WaitNode) = .empty,  // 等待的接收者
    sender_queue: SimpleQueue(WaitNode) = .empty,    // 等待的发送者
    
    closed: bool = false,    // 关闭标志
};

channel在初始化时接收一个T类型的切片,它会直接转换为字节切片,然后赋值给buffer,elem_size会记录T类型的大小,capacity则记录了字节切片的大小。channel是一个环形缓冲区,通过写指针tail和读取指针head进行管理,count是T类型元素个数,其等于(tail-head)/elem_size。当buffer还有空闲容量时,接收或者发送都会先从buffer获取结果。

而接收者和发送者实际上是等待队列,当无法写入元素到buffer时就会进行阻塞,并将等待者放入对应的队列中。

而Channel实现异步等待主要依靠Waiter结构,Waiter是用于异步操作的栈分配等待器,它支持两种模式:

  • direct:用于单个future的等待,拥有任务的所有权并发送通知

  • select:用于多个future的select,指向一个父级directr等待器

基本使用样例

var waiter = Waiter.init();
future.asyncWait(&waiter);
try waiter.wait(1, .allow_cancel);

等待者的wait函数相当于切换协程,而通知恢复协程使用signal函数

实际channel receive处理逻辑

  1. 判断缓冲区是否有元素,如果有直接从缓冲区获取,并且如果发送者队列有等待者,那么将这个等待者数据放入缓冲区并提醒等待者恢复执行逻辑

  2. 如果缓冲区元素为空,而发送者队列非空,则直接从发送者队列获取等待者,获取数据并通知协程恢复

  3. 加入接收者等待队列

实际channel send处理逻辑

  1. 如果接收者队列非空,直接将数据给接收者队列的等待者,并通知恢复协程

  2. 如果缓冲区还有剩余空间,则将数据放入缓冲区

  3. 加入发送者等待队列

BroadcastChannel

Channel实际上是多生产者单消费者(Multi-Producer Single-Consumer)通道,而BoardcastChannel是广播通道,一条消息发送给所有订阅者。

var broadcast = zio.BroadcastChannel(Event).init(allocator);
defer broadcast.deinit(allocator);

// 订阅者
var sub1 = broadcast.subscribe();
defer sub1.deinit();

var sub2 = broadcast.subscribe();
defer sub2.deinit();

// 发布消息
try broadcast.publish(.{ .type = .update });

// 所有订阅者都会收到
const e1 = sub1.receive();
const e2 = sub2.receive();

基本使用

接收发送

接收发送分为阻塞和非阻塞两种方式,阻塞方式receivesend,非阻塞方式tryReceivetrySend,非阻塞方式如果无法发送或接收,会立即返回错误ChannelClosedChannelFullChannelClosed

示例代码

var channel: zio.Channel(Message) = .init;

fn producer(channel: *zio.Channel(Message)) void {
    var i: usize = 0;
    while (i < 100) : (i += 1) {
        const msg = Message{ .id = i, .data = generateData() };
        channel.send(msg) catch |err| {
            std.log.err("Send failed: {}", .{err});
            return;
        };
    }
}

关闭处理

channel支持两种关闭模式

pub const CloseMode = enum {
    /// 优雅关闭 - 允许接收者读取缓冲的消息
    graceful,
    
    /// 立即关闭 - 清空缓冲,接收者立即收到错误
    immediate,
};

如果对channel直接调用close,相当于设置graceful。如果要立即关闭,需要执行

channel.closeWithMode(.immediate);

生产者消费者模式

基本模式

const std = @import("std");
const zio = @import("zio");

const Task = struct {
    id: usize,
    data: []const u8,
};

fn producer(id: usize, channel: *zio.Channel(Task)) void {
    var i: usize = 0;
    while (i < 100) : (i += 1) {
        const task = Task{
            .id = id * 1000 + i,
            .data = generateData(),
        };
        
        channel.send(task) catch {
            std.log.info("Producer {}: channel closed", .{id});
            return;
        };
    }
}

fn consumer(id: usize, channel: *zio.Channel(Task)) void {
    while (true) {
        const task = channel.receive() catch {
            std.log.info("Consumer {}: channel closed", .{id});
            return;
        };
        
        processTask(task);
    }
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();

    const rt = try zio.Runtime.init(gpa.allocator(), .{});
    defer rt.deinit();

    var channel = zio.Channel(Task).initWithCapacity(gpa.allocator(), 100);
    defer channel.deinit(gpa.allocator());

    var group: zio.Group = .init;
    defer group.cancel();

    // 多个生产者
    for (0..3) |id| {
        try group.spawn(producer, .{ id, &channel });
    }
    
    // 多个消费者
    for (0..2) |id| {
        try group.spawn(consumer, .{ id, &channel });
    }
}

工作窃取模式

fn worker(id: usize, channels: []zio.Channel(Task)) void {
    while (true) {
        // 先检查自己的通道
        if (channels[id].tryReceive()) |task| {
            process(task);
            continue;
        } else |_| {}
        
        // 尝试从其他通道窃取
        var stolen = false;
        for (channels, 0..) |*ch, i| {
            if (i == id) continue;
            
            if (ch.tryReceive()) |task| {
                process(task);
                stolen = true;
                break;
            } else |_| {}
        }
        
        if (!stolen) {
            // 没有工作,等待
            zio.sleep(.fromMillis(10));
        }
    }
}

优先级队列

const Priority = enum { high, normal, low };

const PriorityTask = struct {
    priority: Priority,
    task: Task,
};

fn priorityConsumer(channel: *zio.Channel(PriorityTask)) void {
    // 简化的优先级处理
    // 实际实现可能需要多个通道或优先级队列
    while (true) {
        const item = channel.receive() catch break;
        
        switch (item.priority) {
            .high => processImmediately(item.task),
            .normal => processNormal(item.task),
            .low => processWhenIdle(item.task),
        }
    }
}

Select组合

多通道选择

fn selectFromMultipleChannels(
    ch1: *zio.Channel(Message),
    ch2: *zio.Channel(Message),
    timeout: zio.Timeout,
) void {
    while (true) {
        const result = zio.select(.{
            ch1.asyncWaitReceive(),
            ch2.asyncWaitReceive(),
            timeout.asyncWait(),
        });
        
        switch (result) {
            .item_0 => |msg| processFromCh1(msg),
            .item_1 => |msg| processFromCh2(msg),
            .item_2 => {
                std.log.info("Timeout", .{});
                return;
            },
        }
    }
}

超时发送

fn sendWithTimeout(
    channel: *zio.Channel(Message),
    msg: Message,
    duration: zio.Duration,
) !void {
    var timeout = zio.Timeout.fromDuration(duration);
    
    const result = zio.select(.{
        channel.asyncWaitSend(msg),
        timeout.asyncWait(),
    });
    
    switch (result) {
        .item_0 => return,  // 发送成功
        .item_1 => return error.Timeout,
    }
}