由于平时工作内容是LLM 推理加速,所以需要使用c++来实现并适配decoding阶段的beam search方法。下面将以intel neural-speed v1.0的代码为例进行讲解和记录。

neural_speed以早期的llama.cpp为基础构建而来,增加了intel平台的kernel优化以及一些新的推理优化技术(比如streaming-llm, continuous batching)等,在server和client的机器上都有一些不错的效果。

beam search的基本思想

参考博客:十分钟读懂beam search, LLM在解码时如何生成文本

语言模型在生成下一个token时,理论做法是根据前面的token来一起做最大似然估计。从实现上来说,如果要生成长度为L的新文本,假设模型的n_vocabN,就要比较NLN^{L}次组合,选择连乘概率最大的,这么多次模型推理次数肯定是不现实的。另一种token generation方法Greedy Search是使用局部最优的方式,每次选择推理结束时logits最大的那个token id,虽然速度快,但是句子的质量一般不会太好。

beam search是介于两者之间的,通过设置beam size来做速度和质量的trade off.简单来说就是每次只选beam size个组合,生成句子的内容可能在下一个步骤因为其分数值过低而被替换,但是总体依旧保持在固定数量的备选组合中。

下面这个图简单示意了beam search (beam size =2 ) 的过程,不过实际情况中可能ABD可能会被(假设)CEC取代,不一定会是一直保持A开头。

transformers的实现

为了快速并且规范的了解beam search的工程实现,我们以huggingface transformers v4.38.2为例.

beam search接口

beam search scorer

beam hypotheses

主函数前面都是一些准备和校验工作,我们直接从logits获得那部分开始看。

首先是对logits进行处理,code. logits→logsoftmax→logits_processor→beam scores. 其中logits_process是针对一些用户配置的generation_config做的处理, 我们这里以min_new_tokens为例,logits_processor会dispatch到min_new_token_logits_processor,对未达到min_new_tokens长度的beam的当前新logits的eos_token_id位置的logit值置为负无穷,强制让其选不到eos token,即使该token的概率值可能是最大的。 beam scores部分就是将当前token logits 分数累加该beam的之前的分数。logits的shape为[bs * beam_size, 1, n_vocab], beam scores的shape是[bs * beam_size, 1],相加会做broadcast操作。

接下来(code),选取top_k个next tokens,并且计算出token id以及beam indices.其中把next_token_scores reshape成[bs, beam_size * vocab]这一步是为了在每个batch的所有beam上选top-k, 而不是一个个beam选最后再放一起合并,但实际上我是对其reshape成[bs * beam_size, vocab]来做的,因为我是考虑到每个batch的beam_szie可能不同,比如server服务或者一开始进行beam_search之前的prompt的num_beam实际是1,后面才是beam_size个,另一方面把omp对第一维的for并行增大一些。还有一个要注意的是为什么top_k的k是大于等于2 * beam_size,为什么不直接是beam_size,这样不是刚好就可以足够更新了吗?这是因为transformers实现里面还有一个单独的容器去存储提前达到eos_token的beam,这样的话剩下的几个beam 数量就小于beam_size个,缺少的beam我们会继续从top_k中的池子去继续补充,因为有可能补充进来的beam最后的score不一定会比提前到达eos_token的beam差,所以选2倍的话就是如果每个beam都恰好到了eos_token,那么还剩下beam_size个可以补充(每个beam在vocab中选两个,所以不可能剩下的beam_size个beam还有的会到eos_token,所以一定可以打满)。最后得到之后即使确定top_k个next_token分别是什么,以及它属于哪个beam_indices.

然后通过BeamScorer这个类来接受上面的信息更新beam.我们先看下这个类都有哪些成员code. 其中length_penalty和do_early_stopping都是generation_config中的两个配置,前者是对beam的tokens长短进行进一步的打分,后者影响最后beam输出的时机和比较的策略,这两个我们后面会再讲。里面group的概念是对beam_size这个维度的处理再做一个切分,暂时不知道是什么用处,我在C++实现里也暂时忽略了这个变量,我们就把它直接当成是1处理。我们重点看self._beam_hyps和self._done这两个变量,前者就是装到了eos_token的beam的容器BeamHypotheses集合,大小是batch_size个,每个BeamHypotheses的大小是beam_size个,后者代表所有batch_size的beam search都结束了才能结束整个流程,所以是self._done.all(),每个batch是否结束会看BeamHypotheses,这个类也有一个is_done的函数,这个函数是由early_stopping来进行控制的,写的比较绕,我们一步步来看。

self._done[batch_group_idx] = self._done[batch_group_idx] or self._beam_hyps[batch_group_idx].is_done(
next_scores[batch_idx].max().item(), cur_len, decoder_prompt_len
)

def is_done(self, best_sum_logprobs: float, cur_len: int, decoder_prompt_len: Optional[int] = 0) -> bool:
"""
If there are enough hypotheses and that none of the hypotheses being generated can become better than the worst
one in the heap, then we are done with this sentence.
"""

if len(self) < self.num_beams:
return False

# `True`: stop as soon as at least `num_beams` hypotheses are finished
if self.early_stopping is True:
return True
# `False`: heuristic -- compute best possible score from `cur_len`, even though it is not entirely accurate
# when `length_penalty` is positive. See the discussion below for more details.
# https://github.com/huggingface/transformers/pull/20901#issuecomment-1369845565
elif self.early_stopping is False:
highest_attainable_score = best_sum_logprobs / (cur_len - decoder_prompt_len) ** self.length_penalty
ret = self.worst_score >= highest_attainable_score
return ret
# `"never"`: compute the best possible score, depending on the signal of `length_penalty`
else:
# `length_penalty` > 0.0 -> max denominator is obtaned from `max_length`, not from `cur_len` -> min
# abs(`highest_attainable_score`) is obtained -> `highest_attainable_score` is negative, hence we obtain
# its max this way
if self.length_penalty > 0.0:
if self.max_length <= decoder_prompt_len:
raise ValueError("max_length is not larger than decoder prompt length")
highest_attainable_score = (
best_sum_logprobs / (self.max_length - decoder_prompt_len) ** self.length_penalty
)
# the opposite logic applies here (max `highest_attainable_score` from `cur_len`)
else:
highest_attainable_score = best_sum_logprobs / (cur_len - decoder_prompt_len) ** self.length_penalty
ret = self.worst_score >= highest_attainable_score
return ret

1). 容量是beam_size个,没塞满,返回false,容易理解,不多赘述

2). 塞满了beam_size个,但是early_stopping值是true,那么返回true,意思是说我也不会去比较还没塞进去的剩下的那些没到eos_token的beam(top_k那里有解释),直接返回这个hypotheses里面的最大score的为最终的generated tokens.

3). 塞满了beam_size个,但是early_stopping值是false,这个时候是比较这个BeamHypotheses里面最小score是不是比当前这个batch的所有beam的最大score都大,是的话返回true,表示后面再继续下去也不会更大了,否则就返回false,继续扩展beam.这里的疑问是,为什么是选择最小的去比所有beam里面最大的,而不是最大比最大?第一是beam的最终分数,是加入BeamHypotheses时候才最终确定的,除了累加logsoftmax之外,还要除以gen_len ** length_penaltygen_len是新生成的new token的长度,length_penalty可正可负,默认值是1,为正的时候分母越来越大,由于logsoftmax是负数,累加越来越小,所以最终score是变大的,所以是偏好长的结果,为负的时候分母越来越大,所以最终score变小,所以是偏好短的句子。第二,和所有beam的最大score比的时候,这个score也要除以gen_len ** length_penalty来确定最后分数,假如length_penalty为负数,那么以后再继续扩展beam也不会比当前大(注意,继续扩展每个beam都有同样的gen_len),所以肯定返回true是没问题的,甚至我觉得拿最大比最大也没啥逻辑问题(当然这里可能还有一个原因就是返回的不一定是top1的beam,可能返回几个备选结果出来)。如果length_penalty为正,这个时候你不能保证继续拓展beam不会让score变大,毕竟gen_len带动分母变大,所以这个我觉得你拿最大或者最小比所有当前beam的最大也不好说。但是代码的注释解释了一下这个是启发式的,对结果影响比较小,同时也能减少迭代次数(加速)。

4). 塞满了beam_size个,但是early_stoppingnever (对,你没看错,在transformers里,early_stopping是三元的,参见上面的注释解释部分)。这个是为了mathematically correct,毕竟beam search理论中不存在什么early_stopping,这个开关只是为了工程近似和加速。这里就是根据length_penalty的正负和新生成的长度来精确确定最大score.但是实际上我也没怎么看到有人用这种方式。

综上,BeamHypotheses是存放最后要选择出最好的beam的池子,里面的beam可能是带了eos_token,也可能没有,这个由early_stopping确定。BeamScorer的process过程就是往里面加hypo,finalize的过程就是整理池子并选出来,is_done()的作用就是告诉finalize需不需要整理池子(主要是节省next token计算量)。我在后面会接着讲这两个过程。

我自己在C++实现时候对is_done的逻辑做了简化,我只给early_stopping设置true或false两个选项,true就是为了提前结束(eos_token), false就是数学上一个个比较选出最大的。因为我们的工作主要关注在server CPU端,bs不会像GPU那样打得很大,所以计算量上相比于transformers的逻辑不会差太多,而且还容易理解。

for batch_idx in range(batch_size):
batch_group_idx = batch_idx * self.num_beam_groups + group_index
if self._done[batch_group_idx]:
if self.num_beams < len(self._beam_hyps[batch_group_idx]):
raise ValueError(f"Batch can only be done if at least {self.num_beams} beams have been generated")
if eos_token_id is None or pad_token_id is None:
raise ValueError("Generated beams >= num_beams -> eos_token_id and pad_token have to be defined")
# pad the batch
next_beam_scores[batch_idx, :] = 0
next_beam_tokens[batch_idx, :] = pad_token_id
next_beam_indices[batch_idx, :] = 0
continue

# next tokens for this sentence
beam_idx = 0
for beam_token_rank, (next_token, next_score, next_index) in enumerate(
zip(next_tokens[batch_idx], next_scores[batch_idx], next_indices[batch_idx])
):
batch_beam_idx = batch_idx * self.group_size + next_index
# add to generated hypotheses if end of sentence
if (eos_token_id is not None) and (next_token.item() in eos_token_id):
# if beam_token does not belong to top num_beams tokens, it should not be added
is_beam_token_worse_than_top_num_beams = beam_token_rank >= self.group_size
if is_beam_token_worse_than_top_num_beams:
continue
if beam_indices is not None:
beam_index = beam_indices[batch_beam_idx]
beam_index = beam_index + (batch_beam_idx,)
else:
beam_index = None

self._beam_hyps[batch_group_idx].add(
input_ids[batch_beam_idx].clone(),
next_score.item(),
beam_indices=beam_index,
generated_len=cur_len - decoder_prompt_len,
)
else:
# add next predicted token since it is not eos_token
next_beam_scores[batch_idx, beam_idx] = next_score
next_beam_tokens[batch_idx, beam_idx] = next_token
next_beam_indices[batch_idx, beam_idx] = batch_beam_idx
beam_idx += 1

# once the beam for next step is full, don't add more tokens to it.
if beam_idx == self.group_size:
break

if beam_idx < self.group_size:
raise ValueError(
f"At most {self.group_size} tokens in {next_tokens[batch_idx]} can be equal to `eos_token_id:"
f" {eos_token_id}`. Make sure {next_tokens[batch_idx]} are corrected."
)

# Check if we are done so that we can save a pad step if all(done)
self._done[batch_group_idx] = self._done[batch_group_idx] or self._beam_hyps[batch_group_idx].is_done(
next_scores[batch_idx].max().item(), cur_len, decoder_prompt_len
)

return UserDict(
{
"next_beam_scores": next_beam_scores.view(-1),
"next_beam_tokens": next_beam_tokens.view(-1),
"next_beam_indices": next_beam_indices.view(-1),
}
)

兜了一圈后,我们现在具体来看BeamScorer这个类如何更新beam,也就是process函数做了哪些事。我们跳过变量准备部分,直接看两个大for,第一个for是batch维度分开处理,第二个for是对选出的next tokens处理(top_k采出了2* beam_size个)。第一个for里面首先对已经done的batch (done不done由它对应的BeamHypotheses决定)做了padding处理,这样就是防止index乱了,实际上我觉得可以做batch reductio,只不过会带来管理的难度,可以理解transformers为啥不这么做。接下来开始正式更新beam,next token等变量带来的信息有:token_id,累加的logsoftmax值,已经这个token是属于哪个beam分支(beam_index)。如果next_token是eos_token, 按照之前说的,这个beam的token generate完了,应该add到self._beam_hyps中,否则就作为下一个token进行推理。但是这里做了一个判断就是如果这个next_eos_token不是top_beam_size里面的,就不应该被放进self._beam_hyps(数学上好像没有合理的解释?),具体的原因我觉得是beam search理论上就是每步都维持在top_beam_size个,而next_beam_scores永远放满beam_size个应该是为了各种固定的shape考量,维持其不变。最后更新完beams之后检查下这个batch是否done.

到现在这一步,我们完成了从获取模型推理的logits到打分并选择top_k next token,到更新beams状态,接下来,模型会拿到next_token进行推理,然后判断所有的batch是否都结束了(is_done)或者到了指定的最大生成长度,未结束就继续重复前面的步骤。

在推理前其实模型还会更新一下kv cache, 这里如何更新的我并没有仔细研究,看样子是交给了不同的模型去实现自己的版本,本质上要做的事就是根据你选出的非eos_token对应的上一个的beam_index来重排一下kv cache,比如说上次的beam indices是[0,1,2,3],选出的next_token对应的beam_index是[2,1,1,0] (假设排序顺序是按照beam_score降序),那么新的kv_cache更新伪代码则是kv_cache_new[i] = kv_cache[j] for i in [0,1,2,3], j in [2,1,1,0].当然重新开辟一份内存比较低效,实际中我估计可能是swap操作(具体实现不得而知),我在c++的实现中是使用原地memcpy操作(按照顺序),如果框架使用了paged_attention或者indirect kv cache,可能swap操作会比较方便。

这里有一个小问题是刚送进beam_search函数的prompt是如何推理的?因为不足beam_size个,为了兼容while,要么是扩张input到beam_size个,要么是扩张logits到beam_size个,但是这两种在while的top_k里都会选出重复的token, 而非正确的。看注释送进来的inputs就已经是beam_size个了,所以也不知道是怎么处理的。我在C++实现里是把这部分与while True的next_token分开处理的。如果有小伙伴知道transformers是如何处理first token的,欢迎留言告诉我~

最后如果打破循环之后,使用BeamScorer来进行finalize, code. 逻辑也比较简单,就是for batch去检查beam_hyps,done的话就continue,没有的话就把剩余的beam往里面add(beam_hyps自己根据score维持到beam_size个beam).然后选择每个batch对应的beam_hyps的最大的beam作为结果输出。这里面还有一些代码是为了找出每个batch的最大length,然后padding不足这个最大length的剩余部分,以维持一些兼容性。

c++实现

根据上面对transformers beam_search的代码分析,并对一些逻辑进行简化(比如early_stopping判断is_done), 大致确定了复现的框架,如下图。

  • 对输入的prompt进行推理,拿到top beam_size个next_token,组成current_beams

  • 拷贝kv_cache beam index 0到其他beam,准备好kv_cache

  • 从current_beams拿到模型输入,inference 一次

  • 拿到原始logits,对其进行处理,这里只有一个min_new_tokens 对应的logits_processor

  • 打分:对处理后的logits进行logsoftmax处理,并累加对应beam的score

  • 从打分后的logits选出top_k个 (2*beam_size)

  • 根据top_k的next_token等信息,进行筛选,是eos_token并且在top_beam_size内就add进beam_hypothesis中,否则组成next_beams

  • 根据cur_beams和next_beams的beam_indices来进行kv cache update,为下一次model的inference做准备

  • 检查是否is_done, 条件是根据beam_hypothesis是否满(do early_stopping)或到了max_new_tokens

  • 非done,swap next_beams和current_beams内存,继续从第一步开始

  • 是done,根据beam_hypothesis是否done来决定加不加入剩余的beams,最终返回其中的top_1 beam最为输出。

除此之外,我们在整个beam search有关的流程中加了一些打印,方便在debug截断看到每一步的结果,也容易来和transformers库的结果比较 (FP32 data type)。

1). 先看一下beam_next_token, beambeam_hypotheses的定义,这三者是依次“包含”的关系,code. 这三个struct没什么好说的,其中beam中的request_idx代表着输入的prompt的id,是狭义上的batch_size维度,这个是为了对应多batch推理而加的,也会和后面的batch reduction以及未来要说的continuous batching有关。beam_hypotheses中的is_done函数如前所述,对early_stopping为False和never做了简化。

2). 对logits的处理:logits_processor(code),对不到min_new_tokens的beams进行eos_token mask 负无穷处理,代码比较直接;logits_info(code),计算log_softmax,选择top_k个next_token. 这两者都加了batch维度的处理,所以要注意一下bs_stride以及只选最后一个token的vector的offset的计算。在logits_info中,softmax的计算参考了比较常见的数值稳定的实现,即减去最大值再做exp,exp也是调用了标准库而非近似计算,为了保证精度(近似计算的实验未做,具体能差多少,能带来多少速度提升暂时不知道),但并未对所有vocab_size个数值进行exp+log(分子上),仅对需要的top_k个做,这样可以降低一些计算量,此外还针对batch_size维度做了并行(实际上可以在vocab维度上分段并行来寻求max_element值,虽有提升但对整体影响较小,这里为了简洁未采用)。top_k next_token通过最小堆来实现(vector+std::make_heap),对每个batch维度返回一个大小为top_k的最小堆。

logits的处理上,batch的大小并不是简单等于prompt batch_size,也不一定等于prompt_batch_size * beam_size,而是running_prompt_batch_size * beam_size,因为有batch reduction (会有提前结束的某个请求)。

3). 对kv_cache的处理。Neural_Speed的kv_cache是预先申请好的一块足够的buffer,在beam_search中的update主要通过类beam_search_kv_cache_reorder来实现,其更新的主要代码在这里。我们首先考虑一下kv_cache update需要哪些信息和步骤:

  • 在首次推理时,每个prompt我们是只推一次而非repeat prompt到beam_size个,也就是说只会填充beam_index 为0的一个kv cache块,其他块是空,所以我们要对刚推完prompt的kv_cache进行repeat (prefill之后memcpy kv_cache,非paged_attention)

  • 在其后的generation阶段,会根据新的score来更新beams (cur_beams→next_beams),假设beam_size为4,那么推理之前的beam_indices是[0,1,2,3],推理之后的beam_indices假设变成了[0,1,1,2],即next_beams现在把原先的cur_beams的indices=2和3的位置的beam丢弃了,换成了cur_beams的indices=1和2的位置的beam (加新的next_token)。那么,我们要把kv_cache中indices=1的值拷贝到indices=2的位置,把原来indices=2的值拷贝到indices=3的位置。为了完成这样的拷贝,第一我们要知道推理之后的indices,第二,我们按照顺序拷贝,否则会丢失原来的数据,比如上面的例子,应该先拷贝2到3,然后才能拷贝1到2,将这两个信息合在一起,就是给出一个带顺序的拷贝的src和dst idx的容器即可。

  • 每个模型的kv_cache存储的layout不一定一样,简单点说,就是shape可能不一样,这对memcpy造成了困难,一种方法就是每种模型实现一套自己的拷贝方法,然后在最上层的拷贝方法中去dispatch. 考虑到现在的LLM架构基本类似(gpt decoder-only),k和v的shape基本都是head_dim, head_size, seq_lenbatch_size的组合形式,因此我们就直接在写每个modeling的时候将 k.shape = [head_dim, head_k, seq_len], v.shape = [seq_len, head_dim, head_v] 固化下来(both permute好的shape),这样就可以方便我们只写一种memcpy的方式。假如以后出现了与之不同的情况,也可以通过继承的方式复写对应子类的update函数,保证了一定的灵活性。

  • 考虑到会有batch reduction,因此for-loop要注意看管好正在running的prompt的batch idx.,beam_size是不会reduction的,参考上面的对transformers的代码解析。

4). 备选beam的处理,基本上和transformers的逻辑一样,做了一些简化(有些transformers的处理逻辑不是很理解,所以没加),具体c++实现流程参考图和代码(1, 2)。

整个的c++的大致流程可以参考这个函数. 也可以用类似于了transformers的方式来验证c++ beam_search的正确性,参考这里.

continuous batching

细心的朋友可能发现beam_search class除了loop和其包含的成员函数外,还有其他没提到的成员函数,这些函数是为了continuous-batching准备的,具体来说就是将loop拆开成每个step,让外面的调度器来根据实际负载情况单步执行beam_search,执行完时候也会放出完成的request,然后插入新的request. 本质上是多了一些调度处理和first-token和next-token等特殊情况的分叉,核心的东西基本上都是一样的。continuous batching机制的相关代码主要在pool.hscheduler.h里,代码逻辑比较简单,也支持了除beam-search外的greedy-search和top_p等sampling的方法。有兴趣的朋友可以自行查阅下。相关资料:code example, doc

进一步优化

  • prompt-prefix共用一块内存,这个可以用paged kv cache store,也可以对kv cache进行抽象分级存储(system-prompt, initial-tokens, context-tokens, local-contexts等),然后查找重用.

  • paged kv cache存储或许对kv cache的swap比较友好,但是需要重新开发cpu kernel.