Zephyr内核对象-数据传递之邮箱

Creative Commons
本作品采用知识共享署名

本文分析说明Zephyr mailbox的使用和实现。

原理简介

Zephyr的邮箱(mailbox)可以看作是升级版的消息队列(msgq),与msgq不一样的是:邮箱的消息可以指定发送者和接收者,邮箱消息的大小可以不固定,长度也没有对齐的要求。此外邮箱只能用于线程间交换消息,不能用于ISR内。

邮箱构成和特性

Zephyr允许定义任意数量的邮箱,最大数量只由可用内存的量决定。
一个邮箱主要是由发送队列和消息队列两个wait_q组才,如下定义:

1
2
3
4
5
struct k_mbox {
_wait_q_t tx_msg_queue;
_wait_q_t rx_msg_queue;
struct k_spinlock lock;
};

  • 发送队列:用于保存已发送但还未被接收的消息
  • 接收队列:用于保存正在等待消息的线程

当发送线程发送消息到邮箱后会阻塞等待(同步发送)接收线程完成消息接收才退出等待,或是不阻塞而是等待接收线程完成消息接收后通过信号量通知(异步发送)。
邮箱支持:

  • 一对一: A发到邮箱的消息只能B收, B只收A的消息
  • 一对多: A发到邮箱的消息任意线程都可以收,注意不是广播。
  • 多对一:B线程可以收任意线程发到邮箱中消息。

邮箱不支持广播,邮箱中的消息只能被一个线程使用。

消息

消息描述符

消息是由消息描述符描述,指定消息的内存地址,并描述消息如何被邮箱处理。发送线程和接收线程在访问邮箱时都需要提供消息描述符,消息描述符相互匹配才能传递消息。
消息描述符的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct k_mbox_msg {
uint32_t _mailbox;
size_t size;
uint32_t info;
void *tx_data;
void *_rx_data;
struct k_mem_block tx_block;
k_tid_t rx_source_thread;
k_tid_t tx_target_thread;
k_tid_t _syncing_thread;
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
struct k_sem *_async_sem;
#endif
};

_mailbox_rx_data这两个字段已经不再使用
_syncing_thread 用于通知发送者发送的消息已经被接收,_async_sem用于异步发送后通知消息已经完成,这两个字段都是内部使用。

消息描述符中容纳消息的方式有两种:

  • 缓冲: 给定一段buffer,消息数据放于其中
  • 内存块:申请的一片内存块,消息数据放于其中

实际的消息发送二选一,如果二者都为NULL,则表示发送空消息。
tx_data 发送缓冲消息的内存指针,如果是内存块消息或者空消息该字段为NULL,接收者不初始化该字段
tx_block 内存块id,发送内存块消息时使用该字段。如果是缓冲消息或是空消息该字段为NULL,接收者不初始化该字段。
size 消息大小,允许为0,为0时表示空消息. 接收时表示允许接收消息的最大size, 如果收到的消息不是想要的发送方的size将为设为0,如果消息被接收,发送方的size将被更新为实际接收的大小。
info 发送者和接收者在消息接收时双向交换字段,32bit大小
tx_target_thread 接收消息的线程id,接收者消息不初始化该字段,接收者接收该消息后邮箱会将该字段更新为为实际接收者的id。当指定为K_ANY时该消息可以被任意线程接收。
rx_source_thread 发送消息的线程id,发送者消息不初始化该字段,接收者接收该消息后邮箱会将该字段更新为为实际发送者的id。当指定为K_ANY时该线程可接收任意消息。

消息的生命周期

创建消息:发送线程将消息发送到邮箱
消息保存在邮箱
删除消息:接收线程从邮箱接收消息,并将其消耗

使用

API

#define K_MBOX_DEFINE(name)
作用:定义一个k_mbox,并初始化
name: k_mbox name

void k_mbox_init(struct k_mbox *mbox)
作用:初始化k_mbox
mbox: 要初始化的mbox
返回值: 0标示初始化成功

int k_mbox_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg, k_timeout_t timeout)
作用:同步发送消息到邮箱,会等到有线程从邮箱接收,或是超时才退出。如果发送超时消息将不会被保存在邮箱中
mbox: 邮箱
tx_msg: 发送消息的描述符,包含了接收者,消息信息,见前文说明
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:0发送成果,-ENOMSG不等待发送失败,-EAGAIN 等待超时

void k_mbox_async_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg, struct k_sem *sem)
作用:异步发送消息到邮箱,发送后立即返回,有线程从邮箱接收该消息时使用sem通知。
mbox: 邮箱
tx_msg: 发送消息的描述符,包含了接收者,消息信息,见前文说明
sem:接收消息通知信号量

int k_mbox_get(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, void *buffer, k_timeout_t timeout)
作用:从邮箱接收消息。
mbox: 邮箱
rx_msg: 接收消息的描述符,指定发送者,接收消息的信息,见前文说明
buffer: 接收消息数据存放buffer,如果为NULL,邮箱将保存该消息,消息的信息保存在rx_msg中,直到使用k_mbox_data_get消耗该消息才会从邮箱中删除
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:0发送成果,-ENOMSG不等待发送失败,-EAGAIN 等待超时

void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer)
作用:将消息数据搬运到buffer中,并从邮箱中删除该消息。
rx_msg: 接收消息的描述符,包含了要接收消息的信息
buffer: 接收消息数据存放buffer,如果为NULL将直接丢弃该消息

使用说明

初始化

先定义初始化一个邮箱,下面两种方式的效果是一样的
使用宏

1
K_MBOX_DEFINE(my_mailbox);

使用函数

1
2
struct k_mbox my_mailbox;
k_mbox_init(&my_mailbox);

发送消息

同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void producer_thread(void)
{
char buffer[100];
int buffer_bytes_used;

struct k_mbox_msg send_msg;

while (1) {

//准备要发送的消息数据
...
buffer_bytes_used = ... ;
memcpy(buffer, source, buffer_bytes_used);

//准备消息描述符
send_msg.info = 123;
send_msg.size = buffer_bytes_used;
send_msg.tx_data = buffer;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = consumer_thread_id;

//发送消息到mailbox,并等待被接收
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);

//接收完毕退出等待,检查info, size, tx_target_thread 的更新
//info和接收者交换会变为456
//size变为实际接收的30

/* verify that message data was fully received */
if (send_msg.size < buffer_bytes_used) {
printf("some message data dropped during transfer!");
printf("receiver only had room for %d bytes", send_msg.info);
}
}
}

异步,异步发送函数k_mbox_async_put在配置了CONFIG_NUM_MBOX_ASYNC_MSGS=y后才会有效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void producer_thread(void)
{
char buffer[100];
int buffer_bytes_used = 100;

struct k_mbox_msg send_msg;

struct k_sem rev_sem;
k_sem_init(&rev_sem, 0, 10);

while (1) {

//准备消息描述符
...
buffer_bytes_used = ... ;
memcpy(buffer, source, buffer_bytes_used);

/* prepare to send message */
send_msg.info = 123;
send_msg.size = buffer_bytes_used;
send_msg.tx_data = buffer;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = consumer_thread_id;

//发送消息到mailbox
k_mbox_async_put(&my_mailbox, &send_msg, &rev_sem);

//等待消息被接收通知信号量
k_sem_take(&rev_sem, K_FOREVER);

//接收完毕退出等待,检查info, size, tx_target_thread 的更新
//info和接收者交换会变为456
//size变为实际接收的30

/* verify that message data was fully received */
if (send_msg.size < buffer_bytes_used) {
printf("some message data dropped during transfer!");
printf("receiver only had room for %d bytes", send_msg.info);
}
}
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void consumer_thread(void)
{
struct k_mbox_msg recv_msg;
char buffer[100];

int i;
int total;

while (1) {
//准备消息描述符
recv_msg.info = 456;
recv_msg.size = 30;
recv_msg.rx_source_thread = producer_thread_id;

//等待接收消息
k_mbox_get(&my_mailbox, &recv_msg, buffer, K_FOREVER);

//接收完毕退出等待,检查info, size, rx_target_thread 的更新
//info和发收者交换会变为123
//size为实际接收的数据长度30

/* verify that message data was fully received */
if (recv_msg.info != recv_msg.size) {
printf("some message data dropped during transfer!");
printf("sender tried to send %d bytes", recv_msg.info);
}

/* compute sum of all message bytes (from 0 to 100 of them) */
total = 0;
for (i = 0; i < recv_msg.size; i++) {
total += buffer[i];
}
}
}

实现

该小节通过对邮箱内核代码的分析,理解Zephyr是如何实现以上描述的功能特性.
mailbox的实现代码在kernel\mailbox.c中

初始化

前文提到过mailbox的核心就是两个wait_q,初始化则是对发送和接收的wait_q进行初始话,并初始化一个lock用于操作wait_q时锁调度

1
2
3
4
5
6
void k_mbox_init(struct k_mbox *mbox)
{
z_waitq_init(&mbox->tx_msg_queue);
z_waitq_init(&mbox->rx_msg_queue);
mbox->lock = (struct k_spinlock) {};
}

发送消息

这里主要分析同步发送,k_mbox_put->mbox_message_put

1
2
3
4
5
6
7
8
9
10
int k_mbox_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
k_timeout_t timeout)
{
//发送消息的线程在消息被接收前会被放入tx_msg_queue中挂起,这里将当前thread保存在消息中的_syncing_thread,方便消息被接收后对_syncing_thread进行恢复
tx_msg->_syncing_thread = _current;

int ret = mbox_message_put(mbox, tx_msg, timeout);

return ret;
}

消息发送的主要流程位于mbox_message_put中,有下面两种情况:

  1. 遍历邮箱rx_msg_queue中等待消息的线程,如果找到匹配的接收线程,让接收线程变为就绪,挂起当前线程。
  2. 如果没找到匹配的接收线程,将消息放入tx_msg_queue,挂起当前线程并等待超时。

代码详细分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
static int mbox_message_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
k_timeout_t timeout)
{
struct k_thread *sending_thread;
struct k_thread *receiving_thread;
struct k_mbox_msg *rx_msg;
k_spinlock_key_t key;

//设置描述符中发送线程id为当前线程
tx_msg->rx_source_thread = _current;

//发送线程要交换的数据被设置为消息描述符
sending_thread = tx_msg->_syncing_thread;
sending_thread->base.swap_data = tx_msg;

//锁调度,开始处理消息发送
key = k_spin_lock(&mbox->lock);

//检查rx_msg_queue中是否有线程在等待接收邮箱消息
_WAIT_Q_FOR_EACH(&mbox->rx_msg_queue, receiving_thread) {
//获取接收消息描述符
rx_msg = (struct k_mbox_msg *)receiving_thread->base.swap_data;

//将收发消息描述进行匹配
if (mbox_message_match(tx_msg, rx_msg) == 0) {
//该消息有线程可匹配,让接收线程从挂起恢复
z_unpend_thread(receiving_thread);

//pend返回设置为0,表示有正常拿到消息
arch_thread_return_value_set(receiving_thread, 0);
//让接收线程处于就绪态,下一次调度的时候接收线程就会取得消息进行处理
z_ready_thread(receiving_thread);

#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送流程,接收线程处理消失时不做阻塞
if ((sending_thread->base.thread_state & _THREAD_DUMMY)
!= 0U) {
z_reschedule(&mbox->lock, key);
return 0;
}
#endif

//同步发送流程,异步线程处理消息时,发送消息线程被直接挂起,异步线程处理完后会通过线程交换数据中保存的tx_msg->_syncing_thread让发送线程进入就绪继续运行
int ret = z_pend_curr(&mbox->lock, key, NULL, K_FOREVER);

return ret;
}
}

//没有匹配的接收线程,同时也不等待消息发送,就立即退出
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
k_spin_unlock(&mbox->lock, key);
return -ENOMSG;
}

#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送时会用一个dummy thread来进行等待消息接收完成,真正的发送线程会立即退出
if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {
//这里是在发送线程中执行,而挂起的dummy thread,因此不会卡在这里
z_pend_thread(sending_thread, &mbox->tx_msg_queue, K_FOREVER);
k_spin_unlock(&mbox->lock, key);
return 0;
}
#endif
//同步发送,如果没有匹配的线程接收,当前线程被加入tx_msg_queue中,等待超时。因为tx_msg是放到当前线程的交换数据内,这个操作就相当于将消息放入了邮箱
int ret = z_pend_curr(&mbox->lock, key, &mbox->tx_msg_queue, timeout);

return ret;
}

接收

消息接收的流程和发送的流程是对称的,有下面两种情况:

  1. 遍历邮箱tx_msg_queue中查看是否有消息,如果找到匹配的消息,接收线程接收处理该消息后通知发送线程恢复执行。
  2. 如果没找到匹配的消息,将接收线程放入rx_msg_queue,挂起当前线程并等待接收消息超时。

代码详细分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int k_mbox_get(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, void *buffer,
k_timeout_t timeout)
{
struct k_thread *sending_thread;
struct k_mbox_msg *tx_msg;
k_spinlock_key_t key;
int result;

//设置描述符中接收线程id为当前线程
rx_msg->tx_target_thread = _current;

//锁调度,开始处理消息接收
key = k_spin_lock(&mbox->lock);

//检查邮箱的tx_msg_queue中是否有可用消息
_WAIT_Q_FOR_EACH(&mbox->tx_msg_queue, sending_thread) {
//从线程交换数据中获取发送消息描述符
tx_msg = (struct k_mbox_msg *)sending_thread->base.swap_data;

//将收发消息描述进行匹配
if (mbox_message_match(tx_msg, rx_msg) == 0) {
//该消息和接收线程匹配,将发送消息的线程从邮箱tx_msg_queue中取出
z_unpend_thread(sending_thread);

k_spin_unlock(&mbox->lock, key);

//处理数据,数据处理完后会让发送线程就绪并重新调度。如果这里buffer为空,会将数据保留,同时发送线程仍然处理挂起状态。
result = mbox_message_data_check(rx_msg, buffer);

return result;
}
}

//没有匹配的消息,同时也不等待,退出本次接收
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
k_spin_unlock(&mbox->lock, key);
return -ENOMSG;
}

//如果没有匹配的消息,当前线程被加入rx_msg_queue中,等待超时
_current->base.swap_data = rx_msg;
result = z_pend_curr(&mbox->lock, key, &mbox->rx_msg_queue, timeout);

//接收到消息进行消息处理,数据处理完后会让发送线程就绪并重新调度。如果这里buffer为空,会将数据保留,同时发送线程仍然处理挂起状态。
if (result == 0) {
result = mbox_message_data_check(rx_msg, buffer);
}

return result;
}

消息匹配

从前面的分析看到消息匹配是使用mbox_message_match对收发消息描述符进行比较,比较过程中主要有如下事项:

  1. 比较收发的thread id是否匹配
  2. 进行thread id交换
  3. 进行info交换
  4. 更新实际可以接收的size
  5. 将消息的数据地址更新到接收消息描述符内
  6. 将发送线程id更新到接收消息描述符中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int mbox_message_match(struct k_mbox_msg *tx_msg,
struct k_mbox_msg *rx_msg)
{
uint32_t temp_info;

//匹配thread id
if (((tx_msg->tx_target_thread == (k_tid_t)K_ANY) ||
(tx_msg->tx_target_thread == rx_msg->tx_target_thread)) &&
((rx_msg->rx_source_thread == (k_tid_t)K_ANY) ||
(rx_msg->rx_source_thread == tx_msg->rx_source_thread))) {

//更新收发者的id,主要是給一方是K_ANY用,否则不会有变化
rx_msg->rx_source_thread = tx_msg->rx_source_thread;
tx_msg->tx_target_thread = rx_msg->tx_target_thread;

//交换info
temp_info = rx_msg->info;
rx_msg->info = tx_msg->info;
tx_msg->info = temp_info;

//计算实际可以接收的数据
if (rx_msg->size > tx_msg->size) {
rx_msg->size = tx_msg->size;
}

//将消息的数据地址更新到接收消息描述符内
rx_msg->tx_data = tx_msg->tx_data;
rx_msg->tx_block = tx_msg->tx_block;
if (rx_msg->tx_data != NULL) {
rx_msg->tx_block.data = NULL;
} else if (rx_msg->tx_block.data != NULL) {
rx_msg->tx_data = rx_msg->tx_block.data;
} else {
/* no data */
}

//将发送线程id更新到接收消息描述符中
rx_msg->_syncing_thread = tx_msg->_syncing_thread;

return 0;
}

return -1;
}

消息处理

邮箱消息数据的处理有内部函数mbox_message_data_check和外部函数k_mbox_data_get两个。在k_mbox_get传入的参数buffer=NULL时,mbox_message_data_check检查到时消息会被保留在邮箱。之后可以再通过k_mbox_data_get来读取该消息,如果此时buffer=NULL该消息就会从邮箱中取出丢弃。
代码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static int mbox_message_data_check(struct k_mbox_msg *rx_msg, void *buffer)
{
if (buffer != NULL) {
//buffer不为NULL将数据读到buffer中
k_mbox_data_get(rx_msg, buffer);
} else if (rx_msg->size == 0U) {
//buffer为空,且size为0,表示为空消息无需接收数据,直接将消息从邮箱中取出丢弃
mbox_message_dispose(rx_msg);
} else {
//buffer为NULL且不是空消息,表示应用后续会用k_mbox_data_get取数据
}

return 0;
}

void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer)
{
//buffer为空,不需要消息数据,直接丢弃该消息
if (buffer == NULL) {
rx_msg->size = 0;
mbox_message_dispose(rx_msg);
return;
}

//不是空消息,将消息数据copy到buffer中
if ((rx_msg->tx_data != NULL) && (rx_msg->size > 0U)) {
(void)memcpy(buffer, rx_msg->tx_data, rx_msg->size);
}

//空消息,无需接收数据,直接将消息从邮箱中取出丢弃
mbox_message_dispose(rx_msg);
}

mbox_message_dispose再来看取出消息丢弃的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static void mbox_message_dispose(struct k_mbox_msg *rx_msg)
{
struct k_thread *sending_thread;
struct k_mbox_msg *tx_msg;

// 消息被接收被通知的thread会变为NULL,因此不用再丢弃,这季节返回
if (rx_msg->_syncing_thread == NULL) {
return;
}

if (rx_msg->tx_block.data != NULL) {
rx_msg->tx_block.data = NULL;
}

//通过将被通知thread设置为NULL,表示该消息已经被接收,并从邮箱丢弃
sending_thread = rx_msg->_syncing_thread;
rx_msg->_syncing_thread = NULL;


//更新实际接收数据的大小
tx_msg = (struct k_mbox_msg *)sending_thread->base.swap_data;
tx_msg->size = rx_msg->size;

#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送,消息接收完毕,邮箱中已删除该消息,使用信号量进行通知
if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {
struct k_sem *async_sem = tx_msg->_async_sem;

mbox_async_free((struct k_mbox_async *)sending_thread);
if (async_sem != NULL) {
k_sem_give(async_sem);
}
return;
}
#endif

//同步发送,消息接收完毕,邮箱中已删除该消息,通知发送者退出等待状态,发送线程变为ready,并重新调度
arch_thread_return_value_set(sending_thread, 0);
z_mark_thread_as_not_pending(sending_thread);
z_ready_thread(sending_thread);
z_reschedule_unlocked();
}

异步支持

邮箱支持异步,在发送者发送消息后可以立即退出不用等到被接收后才退出。通过下面手段实现异步支持:

  1. 初始化时创建指定数量的异步消息,里面包含一个发送消息描述符和一个用于等待消息被接收的dummy thread
  2. 发送异步消息时,将发送描述符存放在异步消息内,并将等待的同步_syncing_thread设置为dummy thread,并指定通知接收完成的信号量
  3. 接收消息完成后通过信号量通知接收已经完成

异步消息支持的数量受CONFIG_NUM_MBOX_ASYNC_MSGS配置限制。
这里我们主要分析异步消息初始化和发送的流程,其它都已经在前文分析的代码中进行了注释

异步消息

异步消息的结构如下

1
2
3
4
struct k_mbox_async {
struct _thread_base thread; //dummy thread
struct k_mbox_msg tx_msg; //发送消息描述符
};

初始化

初始化建立异步消息的stack,并将声明好的异步消息放入stack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int init_mbox_module(const struct device *dev)
{
ARG_UNUSED(dev);

//声明CONFIG_NUM_MBOX_ASYNC_MSGS个异步消息
static struct k_mbox_async __noinit async_msg[CONFIG_NUM_MBOX_ASYNC_MSGS];


int i;

for (i = 0; i < CONFIG_NUM_MBOX_ASYNC_MSGS; i++) {
//将异步消息内的thread设置为dummy状态
z_init_thread_base(&async_msg[i].thread, 0, _THREAD_DUMMY, 0);
//将异步消息压入堆栈内
k_stack_push(&async_msg_free, (stack_data_t)&async_msg[i]);
}

return 0;
}

//在Zephyr初始化的PRE_KERNEL_1阶段会调用init_mbox_module
SYS_INIT(init_mbox_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);

异步发送

异步发送和同步发送的主要流程一致,差别就是使用异步消息和dummy thread进行等待,的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void k_mbox_async_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
struct k_sem *sem)
{
struct k_mbox_async *async;

//分配异步消息
mbox_async_alloc(&async);

//初始化dummy thread的优先级
async->thread.prio = _current->base.prio;

//将发送消息存放在异步消息内
async->tx_msg = *tx_msg;
//将dummy_thread做为被通知thread进行等待,当前thread调用k_mbox_async_put可立即返回
async->tx_msg._syncing_thread = (struct k_thread *)&async->thread;
//将通知信号量保存在异步信息中
async->tx_msg._async_sem = sem;

//发送消息
(void)mbox_message_put(mbox, &async->tx_msg, K_FOREVER);
}

上面出现了对异步消息的堆栈的处理函数,其实就是对初始化时异步消息堆栈的处理,发送时pop出一个空的异步消息,接收完后将异步消息又加入到堆栈中

1
2
3
4
5
6
7
8
9
static inline void mbox_async_alloc(struct k_mbox_async **async)
{
(void)k_stack_pop(&async_msg_free, (stack_data_t *)async, K_FOREVER);
}

static inline void mbox_async_free(struct k_mbox_async *async)
{
k_stack_push(&async_msg_free, (stack_data_t)async);
}

从前面的分析我们知道初始化时空闲的异步消息只有CONFIG_NUM_MBOX_ASYNC_MSGS个。当空闲的异步消息用完后stack就为空了,此时再发送异步消息就会因为stack pop的特性发生阻塞,k_mbox_async_put被挂起,直到有异步消息被接收后push回stack。
关于stack的特性可以参考Zephyr内核对象-数据传递之Stack

参考

https://docs.zephyrproject.org/latest/reference/kernel/data_passing/mailboxes.html