使用AIAK进行推理加速
更新时间:2024-03-08
1. 准备工作
AIAK推理加速镜像地址
当前已提供打包好的AIAK推理加速镜像如下:
registry.baidubce.com/cce-ai-native/aiak-inference-llm:1.3.1
权重文件
将开源社区huggingface模型权重文件,保存到PFS挂载目录(例如/mnt/models/aiak-huggingface/)
2. 使用docker启动测试实例
(如需要规模使用,请使用标准K8S模式发起部署)
#启动docker
docker run --gpus all -itd --name infer_test --shm-size=32768m --privileged --user=root --network host -v /PATH/TO/MODEL:/mnt/model registry.baidubce.com/cce-ai-native/aiak-inference-llm:1.2.4.4 /bin/bash
#进入docker
docker exec -it infer_test bash
3. 使用量化工具对模型进行量化
# 进入量化工具所在目录
cd /workspace/aiak-model-quant-tool/
# 示例
weight_only_int8量化
python3 model_quantization.py -i /input_path/ -o ./output_path/ -tp 1 -quant_type weight_only_int8 -t fp16
smooth-quant量化
python3 model_quantization.py -i /input_path/ -o ./output_path/ -tp 1 -quant_type smooth_quant -t fp16 -sq 0.75
# 参数介绍
-i 原始模型权重输入路径
-o 量化后的模型权重输出路径
-quant_type 量化算法,支持weight_only_int8或者smooth_quant
-tp 服务部署的GPU卡数,支持1/2/4/8
-t 指定非量化的部分存储类型(有fp16,bf16选择)
-sq 指定smooth-quant的量化smoother参数取值范围[0-1]之间小数(llama默认是0.8, glm130是0.75)
-token 特殊需求时,指定smooth-quant量化需要的token_ids路径
--multi-query-mode 是否使用multi-query-attention(for smooth)
# 备注
weight_only_int8算法支持模型列表:
llama-7b-hf, llama-13b-hf, llama-65b-hf, llama-2-7b-hf, llama-2-13b-hf, llama-2-70b-hf, baichuan1-7b/13b, chatglm2 6b,qwen-14b-chat,qwen-72b-chat,aquila,baichuan,glm130
smooth_quant算法支持模型列表(注:非量化数据类型,目前bf16会转换成fp16):
llama-7b-hf, llama-13b-hf, llama-65b-hf, llama-2-7b-hf, llama-2-13b-hf, llama-2-70b-hf,GLM130b、baichuan7b,baichuan13b,qwen-14b,qwen72b
量化时间预估:与模型大小和卡数有关,需要十几分钟到1小时左右
4. 启动服务
# 镜像里启动脚本参数示例
bash run_triton_v2.sh --model_name=llama --ckpt_path=/ckpt_path/ --data_type=fp16 --quant_mode=weight_only_int8
# 参数含义及默认值说明:
--model_name: 用于识别模型的名称,是必需的。支持llama、llama2、chatglm2、baichuan、glm、qwen
--num_gpus: 默认值为1,表示使用的GPU数量。
--ckpt_path: checkpoint的路径,不能为空。
--data_type: 数据类型,可以是fp16或fp32。
--batch_size: 默认值为8,表示批处理的大小。
--tokenizer_path: tokenizer的路径,默认值为ckpt_path。
--extension_path: 扩展路径,默认值为/dev/,意味着没有扩展。
--quant_mode: 是否启用量化模式,可以不开启或启用量化,支持weight_only_int8或smooth_quant算法。
--enforce_eager:强制使用eager-mode Pytorch,默认值为True;如果设置为False,将混合使用CUDA graph与eager mode。
--gpu_memory_utilization:模型推理过程显存使用比例,默认值为0.99;如果开启CUDA graph,可能需要降低显存使用比例。
--grpc_port: Triton服务器的gRPC端口。
--http_port: Triton服务器的HTTP端口。
--metrics_port: Triton服务器的指标端口。
--log_verbose: 如果设置为true,则启用详细的日志记录。
--task_type: causal_lm|sequence_classification, default to causal_lm 选择模型类型,生成式(默认)、判别式
注意:目前只支持huggingface的checkpoint类型,如果需要使用Megatron的checkpoint,需要先将其转换为HF格式。
5. 发送请求
使用镜像启动client镜像,使用client脚本即可发送grpc流式请求。
第一步:准备client脚本
#!/usr/bin/env python3
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import google.protobuf.json_format
import json
import multiprocessing as mp
import numpy as np
import time
import tritonclient.grpc as grpcclient
import tritonclient.http as httpclient
from argparse import ArgumentParser
from collections.abc import Mapping
from functools import partial
from tritonclient.grpc.service_pb2 import ModelInferResponse
from tritonclient.utils import np_to_triton_dtype
def deep_update(source, overrides):
"""
Update a nested dictionary or similar mapping.
Modify ``source`` in place.
"""
for key, value in overrides.items():
if isinstance(value, Mapping) and value:
returned = deep_update(source.get(key, {}), value)
source[key] = returned
else:
source[key] = overrides[key]
return source
def parse_args():
parser = ArgumentParser()
parser.add_argument("request_file", nargs="?", default=None, metavar="request-file")
parser.add_argument("--params")
parser.add_argument("-t", "--test", action="store_true")
args = parser.parse_args()
return args
def generate_parameters(args):
DEFAULT_CONFIG = {
'protocol': 'http',
'url': None,
'model_name': 'fastertransformer',
'verbose': False,
'stream_api': False,
}
params = {'config': DEFAULT_CONFIG, 'request': []}
if args.request_file is not None:
with open(args.request_file) as f:
file_params = json.load(f)
deep_update(params, file_params)
args_params = json.loads(args.params) if args.params else {}
deep_update(params, args_params)
for index, value in enumerate(params['request']):
params['request'][index] = {
'name': value['name'],
'data': np.array(value['data'], dtype=value['dtype']),
}
if params['config']['url'] is None:
if params['config']['protocol'] == 'grpc':
params['config']['url'] = 'localhost:8001'
else:
params['config']['url'] = 'localhost:8000'
return params['config'], params['request']
def prepare_tensor(client, name, input):
t = client.InferInput(name, input.shape, np_to_triton_dtype(input.dtype))
t.set_data_from_numpy(input)
return t
def stream_consumer(queue, test: bool):
start_time = None
tokens_before = np.array([], dtype=np.int32)
while True:
result = queue.get()
if result is None:
break
if isinstance(result, float):
start_time = result
continue
message = ModelInferResponse()
google.protobuf.json_format.Parse(json.dumps(result), message)
result = grpcclient.InferResult(message)
tokens = result.as_numpy("OUTPUT_0")
is_finished = result.as_numpy("is_finished")
string = [token.decode() for token in tokens]
print(repr(string[0])[1:-1])
if test:
assert len(tokens) == len(tokens_before) + 1
assert np.array_equal(tokens[:-1], tokens_before)
tokens_before = tokens
def stream_callback(queue, result, error):
if error:
queue.put(error)
else:
queue.put(result.get_response(as_json=True))
def main_stream(config, request, test: bool):
client_type = grpcclient
kwargs = {"verbose": config["verbose"]}
result_queue = mp.Queue()
consumer = mp.Process(target=stream_consumer, args=(result_queue, test))
consumer.start()
with grpcclient.InferenceServerClient(config['url'], verbose=config["verbose"]) as cl:
payload = [prepare_tensor(grpcclient, field['name'], field['data'])
for field in request]
cl.start_stream(callback=partial(stream_callback, result_queue))
result_queue.put(time.perf_counter())
cl.async_stream_infer(config['model_name'], payload)
result_queue.put(None)
consumer.join()
def main_sync(config, request):
is_http = config['protocol'] == 'http'
client_type = httpclient if is_http else grpcclient
kwargs = {"verbose": config["verbose"]}
if is_http:
kwargs["concurrency"] = 10
with client_type.InferenceServerClient(config['url'], **kwargs) as cl:
payload = [prepare_tensor(client_type, field['name'], field['data'])
for field in request]
result = cl.infer(config['model_name'], payload)
if is_http:
for output in result.get_response()['outputs']:
print("{}:\n{}\n".format(output['name'], result.as_numpy(output['name'])))
else:
for output in result.get_response().outputs:
print("{}:\n{}\n".format(output.name, result.as_numpy(output.name)))
if __name__ == "__main__":
args = parse_args()
config, request = generate_parameters(args)
if not config['stream_api']:
main_sync(config, request)
else:
main_stream(config, request, args.test)
第二步:修改请求内容,修改sample_request_ensemble.json文件。
{
"config": {
"model_name": "ensemble",
"protocol": "grpc",
"stream_api": true
},
"request": [
{
"name": "INPUT_0",
"data": [["Malika Louback believes her three engineering degrees make her"]],
"dtype": "object"
},
{
"name": "INPUT_1",
"data": [[1024]],
"dtype": "uint32"
},
{
"name": "beam_search_diversity_rate",
"data": [[0.0]],
"dtype": "float32"
},
{
"name": "temperature",
"data": [[0.0]],
"dtype": "float32"
},
{
"name": "len_penalty",
"data": [[1.0]],
"dtype": "float32"
},
{
"name": "repetition_penalty",
"data": [[1.0]],
"dtype": "float32"
},
{
"name": "random_seed",
"data": [[0]],
"dtype": "uint64"
},
{
"name": "is_return_log_probs",
"data": [[true]],
"dtype": "bool"
},
{
"name": "beam_width",
"data": [[1]],
"dtype": "uint32"
},
{
"name": "runtime_top_k",
"data": [[-1]],
"dtype": "int32"
},
{
"name": "runtime_top_p",
"data": [[1.0]],
"dtype": "float32"
},
{
"name": "start_id",
"data": [[1]],
"dtype": "uint32"
},
{
"name": "end_id",
"data": [[2002]],
"dtype": "uint32"
},
{
"name": "INPUT_2",
"data": [[""]],
"dtype": "object"
},
{
"name": "INPUT_3",
"data": [[""]],
"dtype": "object"
}
]
}
其中INPUT_0字段表示请求内容,INPUT_1字段表示希望LLM生成的最大token数,INPUT_3表示end_words。修改完成后保存文件即可。
第三步:发送请求,使用 python3 issue_request.py sample_request_ensemble.json工具发送请求。即可看到预期输出:
附录1:whisper模型使用示例
"""
Example code to run OpenAI Whisper
"""
from vllm import SamplingParams, LLM
from transformers import WhisperProcessor
from evaluate import load
from tqdm import tqdm
from datasets import load_dataset
import sys
if len(sys.argv) != 4:
print(f'Usage: python3 {sys.argv[0]} /path/to/model /path/to/dataset /path/to/wer')
exit(0)
path_to_model = sys.argv[1]
path_to_dataset = sys.argv[2]
wer_path = sys.argv[3]
librispeech_test_clean = load_dataset(path_to_dataset, "clean", split="test")
processor = WhisperProcessor.from_pretrained(path_to_model)
llm = LLM(model=path_to_model, gpu_memory_utilization=0.4)
sampling_params = SamplingParams(temperature=0.0, max_tokens=1024)
references = []
results = []
batch_size = 32
batch_features = []
def run_samples(input_features, sampling_params, pbar):
"""
Run processing with batch
"""
results = []
outputs = llm.generate(input_features=input_features,
sampling_params=sampling_params,
use_tqdm=False)
for output in outputs:
generated_text = output.outputs[0].text
generated_text = processor.tokenizer._normalize(generated_text)
results.append(generated_text)
pbar.update(1)
return results
pbar = tqdm(desc='samples', total=len(librispeech_test_clean))
for batch in librispeech_test_clean:
audio = batch['audio']
# append reference
references.append(processor.tokenizer._normalize(batch['text']))
input_features = processor(audio["array"], sampling_rate=audio["sampling_rate"], return_tensors="pt").input_features
input_features = input_features.half().cuda()
batch_features.append(input_features)
if len(batch_features) == batch_size:
# actual run batch
results += run_samples(batch_features, sampling_params, pbar)
batch_features = []
# process remaining batch
if len(batch_features) > 0:
results += run_samples(batch_features, sampling_params, pbar)
pbar.close()
print(f'processed sample: {len(references)}')
# print(f'results: {results}')
# print(f'referneces: {references}')
wer = load(wer_path)
print(f'wer: {100 * wer.compute(references=references, predictions=results):.2f} %')
附录2:sample_request_ensemble.json部分字段解释
Name | Usage | Description |
---|---|---|
INPUT_0 | input_ids | The input ids (context) |
INPUT_1 | input_lengths | The max output lengths of output token ids |
INPUT_3 | stop_words_list | Optional. When model generates words in this list, it will stop the generation. An extension of stop id |
注意:sample_request_ensemble.json中所有字段须保持batch_size一致。
{
"config": {
"model_name": "ensemble",
"protocol": "grpc",
"stream_api": true
},
"request": [
{
"name": "INPUT_0",
"data": [["Tell me about your self"],
["Who are you"],
["Malika Louback believes her three engineering degrees make her"],
["Racer's hurricane in 1837 was named after"],
["Jason Robertson is only the second Filipino American to"],
["The 1886 song of thanks \"Ein Danklied sei dem Herrn\" was performed at"],
["Fori Nehru founded an employment campaign in 1947 to sell"],
["1+1="]],
"dtype": "object"
},
{
"name": "INPUT_1",
"data": [[24], [24], [24], [24], [24], [24], [24], [24]],
"dtype": "uint32"
},
{
"name": "beam_search_diversity_rate",
"data": [[0], [0], [0], [0], [0], [0], [0], [0]],
"dtype": "float32"
},
{
"name": "temperature",
"data": [[1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0]],
"dtype": "float32"
},
{
"name": "len_penalty",
"data": [[1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0]],
"dtype": "float32"
},
{
"name": "repetition_penalty",
"data": [[1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0]],
"dtype": "float32"
},
{
"name": "random_seed",
"data": [[0], [0], [0], [0], [0], [0], [0], [0]],
"dtype": "uint64"
},
{
"name": "is_return_log_probs",
"data": [[true], [true], [true], [true], [true], [true], [true], [true]],
"dtype": "bool"
},
{
"name": "beam_width",
"data": [[1], [1], [1], [1], [1], [1], [1], [1]],
"dtype": "uint32"
},
{
"name": "runtime_top_k",
"data": [[1], [1], [1], [1], [1], [1], [1], [1]],
"dtype": "int32"
},
{
"name": "runtime_top_p",
"data": [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]],
"dtype": "float32"
},
{
"name": "start_id",
"data": [[1], [1], [1], [1], [1], [1], [1], [1]],
"dtype": "uint32"
},
{
"name": "end_id",
"data": [[2], [2], [2], [2], [2], [2], [2], [2]],
"dtype": "uint32"
},
{
"name": "INPUT_2",
"data": [[""],
[""],
[""],
[""],
[""],
[""],
[""],
[""]],
"dtype": "object"
},
{
"name": "INPUT_3",
"data": [[""],
[""],
[""],
[""],
[""],
[""],
[""],
[""]],
"dtype": "object"
}
]
}
字段 | 类型 | 必填 | 默认值 | 说明 | 限制 |
---|---|---|---|---|---|
beam_search_diversity_rate | float | 是 | 1.0 | beam_search多样性比率 | |
temperature | float | 是 | 1.0 | 控制算法输出概率分布的"软度"或"硬度"的参数。 | |
len_penalty | float | 是 | 1.0 | 用于平衡生成的文本的长度与其质量之间的权衡。 | |
is_return_log_probs | bool | 是 | false | 是否返回log probs。 | |
beam_width | int | 是 | 1 | beam宽度 | |
runtime_top_k | int | 是 | 1 | top_k采样参数 | runtime_top_k参数dtype类型从uint32变更为int32 |
runtime_top_p | float | 是 | 1.0 | top_p采样参数 | |
start_id | int | 是 | 0 | tokenizer的start_id | |
end_id | int | 是 | 0 | tokenizer的end_id | |
INPUT_0 | string | 是 | 空 | 输入字符串(上下文) | |
INPUT_1 | int | 是 | 0 | 输出 id 的最大长度 | |
output_seq_len | uint32_t | 是 | 0 | 您希望得到的结果中最大的token数量。注意,它包含输入长度 | |
INPUT_3 | string | 是 | 空 | 可选。当遇到在此列表中生成单词时,它将停止生成。停止id的扩展,即stop words list。 | |
repetition_penalty | float | 是 | 1.0 | 可选。将重复罚分应用于束搜索和采样的logits。与presence_penalty互斥 | |
random_seed | unsigned long long int | 是 | 0 | 可选。用于初始化采样中的随机表的随机种子 | |
OUTPUT_0 | string | 否 | 输出字符串 | ||
sequence_length | int | 否 | 输出id的长度 | ||
cum_log_probs | float | 否 | 生成句子的累积对数概率 | ||
output_log_probs | float | 否 | 它记录了每一步采样的 logits 的对数概率。 |
附录3:extension插件说明
样例代码如下,用户可以修改具体函数实现,来实现自定义需求,如自定义模型preprocess和postprocess流程等。
注意:extension插件命名必须为 vllm_extension.py
和 template.json
。
extension/vllm_extension.py
内容如下
# !/usr/bin/env python3
from transformers import AutoTokenizer
from typing import List
import json
class VllmExtension:
def __init__(self): #不能有构造参数,因为自动构建实例的时候不能传入参数;但可以从当前目录获取config
import pathlib
folder_path = pathlib.Path(__file__).parent.resolve()
with open(f'{folder_path}/template.json', encoding='utf-8') as f:
self.template = json.load(f)
def get_tokenizer(self, tokenizer_path: str) -> AutoTokenizer:
return AutoTokenizer.from_pretrained(tokenizer_path, trust_remote_code=True, use_fast=False)
def text_to_tokens(self, tokenizer: AutoTokenizer, text: str) -> List[int]:
text = self.template['prompt'].replace('#CONTENT#', text)
prompt_tokens = tokenizer.encode(text)
return prompt_tokens
def tokens_to_text(self, tokenizer: AutoTokenizer, output_tokens: List[int]) -> str:
return tokenizer.decode(output_tokens, skip_special_tokens=False, clean_up_tokenization_spaces=False)
extension/template.json
内容如下
{
"prompt": "[Round 1]\n\n问:#CONTENT#\n\n答:"
}