VPP接口INPUT节点运行数据
在设置virtio接口接收/发送队列函数的最后,更新接口的运行数据。
void virtio_vring_set_rx_queues (vlib_main_t *vm, virtio_if_t *vif)
{ ...vnet_hw_if_update_runtime_data (vnm, vif->hw_if_index);
}
void virtio_vring_set_tx_queues (vlib_main_t *vm, virtio_if_t *vif)
{...vnet_hw_if_update_runtime_data (vnm, vif->hw_if_index);
接口运行数据
更新硬件接口的运行数据,参数hw_if_index指定所要更新的硬件接口的索引值。
void
vnet_hw_if_update_runtime_data (vnet_main_t *vnm, u32 hw_if_index)
{ vlib_main_t *vm = vlib_get_main ();vnet_hw_if_output_node_runtime_t *new_out_runtimes = 0;int something_changed_on_rx = 0;int something_changed_on_tx = 0;clib_bitmap_t *pending_int = 0;int last_int = -1;log_debug ("update node '%U' triggered by interface %v",format_vlib_node_name, vm, node_index, hi->name);
根据VPP系统的线程数量,分配d,a向量空间,并且将per_thread_node_state和per_thread_node_adaptive两个向量初始化为无效状态。其中,后边两个向量用于保存接口的input节点在每个worker线程中的新的运行模式:中断或者轮询,以及自适应adaptive(非独立模式,标志位)。
d,a两个向量分别保存每个线程中新的轮询模式队列,和自适应队列的值。
vnet_hw_if_rxq_poll_vector_t *pv, **d = 0, **a = 0;vlib_node_state_t *per_thread_node_state = 0;u32 n_threads = vlib_get_n_threads ();u16 *per_thread_node_adaptive = 0;vec_validate (d, n_threads - 1);vec_validate (a, n_threads - 1);vec_validate_init_empty (per_thread_node_state, n_threads - 1, VLIB_NODE_STATE_DISABLED);vec_validate_init_empty (per_thread_node_adaptive, n_threads - 1, 0);
以下找到每个线程中input节点的新的运行状态。
遍历VPP接口模块中的接收队列池(hw_if_rx_queues),根据接收队列成员(hw_if_index)找到其对应的硬件接口,以下处理那些硬件接口与参数中指定硬件接口具有相同input_node_index的接收队列。
即在相同的input节点进行处理的硬件接口。
vnet_interface_main_t *im = &vnm->interface_main;vnet_hw_if_rx_queue_t *rxq;vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);u32 node_index = hi->input_node_index;/* find out desired node state on each thread */pool_foreach (rxq, im->hw_if_rx_queues){u32 ti = rxq->thread_index;vnet_hw_interface_t *rxq_hi;ASSERT (rxq->mode != VNET_HW_IF_RX_MODE_UNKNOWN);ASSERT (rxq->mode != VNET_HW_IF_RX_MODE_DEFAULT);rxq_hi = vnet_get_hw_interface (vnm, rxq->hw_if_index);if (rxq_hi->input_node_index != node_index)continue;
根据这些接收队列的模式,初始化VPP处理线程节点的运行状态,轮询状态优先,即如果存在轮询模式的接收队列,input优先使用轮询状态:
- 接收队列模式为轮询(RX_MODE_POLLING),线程input节点状态也设置为轮询(NODE_STATE_POLLING)。
- 接收队列模式为中断/自适应(RX_MODE_INTERRUPT/ADAPTIVE),线程input节点状态设置为中断(NODE_STATE_INTERRUPT)。
轮询状态(NODE_STATE_POLLING)的input节点,在线程的主循环中每次都执行。中断状态的input节点,在接收到设备发送的中断请求时运行。
if (rxq->mode == VNET_HW_IF_RX_MODE_POLLING) {per_thread_node_state[ti] = VLIB_NODE_STATE_POLLING;per_thread_node_adaptive[ti] = 0;}if (per_thread_node_state[ti] == VLIB_NODE_STATE_POLLING)continue;if (rxq->mode == VNET_HW_IF_RX_MODE_INTERRUPT || rxq->mode == VNET_HW_IF_RX_MODE_ADAPTIVE)per_thread_node_state[ti] = VLIB_NODE_STATE_INTERRUPT;if (rxq->mode == VNET_HW_IF_RX_MODE_ADAPTIVE)per_thread_node_adaptive[ti] = 1;}
以下再次遍历接收队列池,找到在相同input节点进行处理的硬件接口的接收队列。
vnet_hw_if_rxq_poll_vector_t *pv,/* construct per-thread polling vectors */pool_foreach (rxq, im->hw_if_rx_queues){u32 ti = rxq->thread_index;vnet_hw_interface_t *rxq_hi;rxq_hi = vnet_get_hw_interface (vnm, rxq->hw_if_index);if (rxq_hi->input_node_index != node_index)continue;
对于中断或者adaptive模式的接收队列,记录在pool中的最大索引值。如果队列处理线程节点为adaptive模式,分配结构(vnet_hw_if_rxq_poll_vector_t),保存dev_instance和queue_id,分别为接口设备的实例索引和队列索引。
如果队列处理线程节点工作在轮询POLLING模式,也分配pv结构,保存接收队列信息。
if (rxq->mode == VNET_HW_IF_RX_MODE_INTERRUPT ||rxq->mode == VNET_HW_IF_RX_MODE_ADAPTIVE)last_int = clib_max (last_int, rxq - im->hw_if_rx_queues);if (per_thread_node_adaptive[ti]) {vec_add2_aligned (a[ti], pv, 1, CLIB_CACHE_LINE_BYTES);pv->dev_instance = rxq->dev_instance;pv->queue_id = rxq->queue_id;}if (per_thread_node_state[ti] != VLIB_NODE_STATE_POLLING)continue;vec_add2_aligned (d[ti], pv, 1, CLIB_CACHE_LINE_BYTES);pv->dev_instance = rxq->dev_instance;pv->queue_id = rxq->queue_id;}
遍历VPP系统的线程,首先,对当前线程需要处理的所有pv结构,即所有设备和队列,进行排序。
/* sort poll vectors and compare them with active ones to avoid unnecesary barrier */for (int i = 0; i < n_threads; i++){vlib_main_t *ovm = vlib_get_main_by_index (i);vlib_node_state_t old_state;vec_sort_with_function (d[i], poll_data_sort);
其次,获取硬件接口的input节点(node_index),在当前线程中的运行状态,如果与新的状态不同,设置修改标志。
old_state = vlib_node_get_state (ovm, node_index);if (per_thread_node_state[i] != old_state) {something_changed_on_rx = 1;log_debug ("state changed for node %U on thread %u from %s to %s",format_vlib_node_name, vm, node_index, i,node_state_str[old_state],node_state_str[per_thread_node_state[i]]);}
否则,在修改标志为0的情况下,进一步进行检查。获取input节点在当前线程的运行数据rt,如果节点处理的中断模式队列数量,与以上得出的数量不同,设置修改标志。接下来通过memcmp比较中断模式队列的内容是否完全相同。
/* check if something changed */if (something_changed_on_rx == 0) {vnet_hw_if_rx_node_runtime_t *rt;rt = vlib_node_get_runtime_data (ovm, node_index);if (vec_len (rt->rxq_vector_int) != vec_len (d[i]))something_changed_on_rx = 1;else if (memcmp (d[i], rt->rxq_vector_int, vec_len (d[i]) * sizeof (**d)))something_changed_on_rx = 1;
进一步比较接收中断的数量是否相等,以及轮询状态的向量是否完全相等。
if (clib_interrupt_get_n_int (rt->rxq_interrupts) != last_int + 1)something_changed_on_rx = 1;if (something_changed_on_rx == 0 && per_thread_node_adaptive[i]) {if (vec_len (rt->rxq_vector_poll) != vec_len (a[i]))something_changed_on_rx = 1;else if (memcmp (a[i], rt->rxq_vector_poll, vec_len (a[i]) * sizeof (**a)))something_changed_on_rx = 1;}}}
发送数据修改判断
如果参数硬件接口的发送队列数量大于0,复制一份原有的线程节点runtimes数据,如下:
if (vec_len (hi->tx_queue_indices) > 0){new_out_runtimes = vec_dup_aligned (hi->output_node_thread_runtimes, CLIB_CACHE_LINE_BYTES);vec_validate_aligned (new_out_runtimes, n_threads - 1, CLIB_CACHE_LINE_BYTES);
遍历每个线程中的发送节点runtime,
for (u32 i = 0; i < vec_len (new_out_runtimes); i++){vnet_hw_if_output_node_runtime_t *rt;rt = vec_elt_at_index (new_out_runtimes, i);u32 n_queues = 0, total_queues = vec_len (hi->tx_queue_indices);rt->frame = 0;rt->lookup_table = 0;
遍历接口所有的发送队列。如果当前发送队列的处理线程向量中,不包含当前的遍历线程,不进行处理。否则,初始化frame结构,添加到线程的runtime结构成员frame向量中。
for (u32 j = 0; j < total_queues; j++) {u32 queue_index = hi->tx_queue_indices[j];vnet_hw_if_tx_frame_t frame = { .shared_queue = 0,.hints = 7, .queue_id = ~0 };vnet_hw_if_tx_queue_t *txq = vnet_hw_if_get_tx_queue (vnm, queue_index);if (!clib_bitmap_get (txq->threads, i))continue;log_debug ("tx queue data changed for interface %v, thread %u (queue_id %u)", hi->name, i, txq->queue_id);something_changed_on_tx = 1;frame.queue_id = txq->queue_id;frame.shared_queue = txq->shared_queue;vec_add1 (rt->frame, frame);n_queues++;}
n_queues表示当前线程需要处理的接口队列数量,如果此值发生变化,使用新值,并且标记改变(something_changed_on_tx)。
// don't initialize rt->n_queues aboveif (rt->n_queues != n_queues) {something_changed_on_tx = 1;rt->n_queues = n_queues;}
这里要求线程处理的队列数量为2的幂,初始化lookup_table,填充队列ID值。lookup_table向量的长度可能大于队列数量,这时lookup_table中存在相同的队列ID。
/* It is only used in case of multiple txq. */if (rt->n_queues > 0) {if (!is_pow2 (n_queues))n_queues = max_pow2 (n_queues);vec_validate_aligned (rt->lookup_table, n_queues - 1, CLIB_CACHE_LINE_BYTES);for (u32 k = 0; k < vec_len (rt->lookup_table); k++) {rt->lookup_table[k] = rt->frame[k % rt->n_queues].queue_id;log_debug ("tx queue lookup table changed for interface %v, (lookup table [%u]=%u)", hi->name, k, rt->lookup_table[k]);}}}
如果接口的队列索引向量(hi->tx_queue_indices)为空,即不存在发送队列,表明接口被删除。
} else/* interface deleted */something_changed_on_tx = 1;
更新runtimes数据
如果接收或者发送队列改变,在处理之前锁住所有的VPP线程。
if (something_changed_on_rx || something_changed_on_tx){int with_barrier;if (vlib_worker_thread_barrier_held ()) {with_barrier = 0;log_debug ("%s", "already running under the barrier");} elsewith_barrier = 1;if (with_barrier)vlib_worker_thread_barrier_sync (vm);
如果接收发生改变,遍历所有线程,更新线程中node节点对应的runtime数据,比如使用线程新的接收队列中断向量(d[i])替换原有的值(rxq_vector_int)。
if (something_changed_on_rx) {for (int i = 0; i < n_threads; i++) {vlib_main_t *vm = vlib_get_main_by_index (i);vnet_hw_if_rx_node_runtime_t *rt;rt = vlib_node_get_runtime_data (vm, node_index);pv = rt->rxq_vector_int;rt->rxq_vector_int = d[i];d[i] = pv;
如果此线程需要处理adaptive模式的接收队列,使用新值(a[i])替换旧值(rxq_vector_poll)。
if (per_thread_node_adaptive[i]) {pv = rt->rxq_vector_poll;rt->rxq_vector_poll = a[i];a[i] = pv;}
如果存在未处理的接收队列中断,将这些中断号保存到pending_int位图中,并记录下最大的中断号(last_int)。
if (rt->rxq_interrupts) {void *in = rt->rxq_interrupts;int int_num = -1;while ((int_num = clib_interrupt_get_next (in, int_num)) != -1) {clib_interrupt_clear (in, int_num);pending_int = clib_bitmap_set (pending_int, int_num, 1);last_int = clib_max (last_int, int_num);}}
接下来设置线程中,节点(node_index)的新状态,以及标志位VLIB_NODE_FLAG_ADAPTIVE_MODE。根据最大中断号,重新设置rxq_interrupts向量的大小。
vlib_node_set_state (vm, node_index, per_thread_node_state[i]);vlib_node_set_flag (vm, node_index, VLIB_NODE_FLAG_ADAPTIVE_MODE, per_thread_node_adaptive[i]);if (last_int >= 0)clib_interrupt_resize (&rt->rxq_interrupts, last_int + 1);elseclib_interrupt_free (&rt->rxq_interrupts);}}
如果发送发生改变,处理相对简单,不需要想接收那样遍历所有线程。如下将new_out_runtimes替换掉接口原有的output_node_thread_runtimes。最后,释放VPP的worker线程锁。
if (something_changed_on_tx) {vnet_hw_if_output_node_runtime_t *t;t = hi->output_node_thread_runtimes;hi->output_node_thread_runtimes = new_out_runtimes;new_out_runtimes = t;} if (with_barrier)vlib_worker_thread_barrier_release (vm);} else log_debug ("skipping update of node '%U', no changes detected",format_vlib_node_name, vm, node_index);
如果存在之前未处理的中断,将其重新设置回新的runtime中。
if (pending_int) { int i; clib_bitmap_foreach (i, pending_int) { vnet_hw_if_rx_queue_set_int_pending (vnm, i);}clib_bitmap_free (pending_int);}