Jina-serve 是一个用于构建和部署 AI 服务的框架,支持通过 gRPC、HTTP 和 WebSockets 进行通信。从本地开发到生产环境,都能轻松扩展服务,让你专注于核心逻辑。
pip install jina
查看 Apple Silicon 和 Windows 的安装指南。
三个主要层次:
- 数据:BaseDoc 和 DocList 用于输入/输出
- 服务:Executor 处理文档,Gateway 连接服务
- 编排:Deployment 服务 Executor,Flow 创建管道
下面是一个基于 StableLM 的 gRPC AI 服务示例:
from jina import Executor, requests
from docarray import DocList, BaseDoc
from transformers import pipeline
class Prompt(BaseDoc):
text: str
class Generation(BaseDoc):
prompt: str
text: str
class StableLM(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.generator = pipeline(
'text-generation', model='stabilityai/stablelm-base-alpha-3b'
)
@requests
def generate(self, docs: DocList[Prompt], **kwargs) -> DocList[Generation]:
generations = DocList[Generation]()
prompts = docs.text
llm_outputs = self.generator(prompts)
for prompt, output in zip(prompts, llm_outputs):
generations.append(Generation(prompt=prompt, text=output))
return generations
使用 Python 或 YAML 部署:
from jina import Deployment
from executor import StableLM
dep = Deployment(uses=StableLM, timeout_ready=-1, port=12345)
with dep:
dep.block()
jtype: Deployment
with:
uses: StableLM
py_modules:
- executor.py
timeout_ready: -1
port: 12345
使用客户端:
from jina import Client
from docarray import DocList
from executor import Prompt, Generation
prompt = Prompt(text='suggest an interesting image generation prompt')
client = Client(port=12345)
response = client.post('/', inputs=[prompt], return_type=DocList[Generation])
将多个服务串联成 Flow:
from jina import Flow
flow = Flow(port=12345).add(uses=StableLM).add(uses=TextToImage)
with flow:
flow.block()
利用内置功能提升吞吐量:
- 副本实现并行处理
- 分片进行数据分区
- 动态批处理提高模型推理效率
扩展 Stable Diffusion 部署示例:
jtype: Deployment
with:
uses: TextToImage
timeout_ready: -1
py_modules:
- text_to_image.py
env:
CUDA_VISIBLE_DEVICES: RR
replicas: 2
uses_dynamic_batching:
/default:
preferred_batch_size: 10
timeout: 200
TextToImage/
├── executor.py
├── config.yml
├── requirements.txt
# config.yml
jtype: TextToImage
py_modules:
- executor.py
metas:
name: TextToImage
description: Text to Image generation Executor
jina hub push TextToImage
jina export kubernetes flow.yml ./my-k8s
kubectl apply -R -f my-k8s
jina export docker-compose flow.yml docker-compose.yml
docker-compose up
一键部署:
jina cloud deploy jcloud-flow.yml
实现逐 token 流式输出,打造响应式 LLM 应用:
from docarray import BaseDoc
class PromptDocument(BaseDoc):
prompt: str
max_tokens: int
class ModelOutputDocument(BaseDoc):
token_id: int
generated_text: str
from transformers import GPT2Tokenizer, GPT2LMHeadModel
class TokenStreamingExecutor(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.model = GPT2LMHeadModel.from_pretrained('gpt2')
@requests(on='/stream')
async def task(self, doc: PromptDocument, **kwargs) -> ModelOutputDocument:
input = tokenizer(doc.prompt, return_tensors='pt')
input_len = input['input_ids'].shape[1]
for _ in range(doc.max_tokens):
output = self.model.generate(**input, max_new_tokens=1)
if output[0][-1] == tokenizer.eos_token_id:
break
yield ModelOutputDocument(
token_id=output[0][-1],
generated_text=tokenizer.decode(
output[0][input_len:], skip_special_tokens=True
),
)
input = {
'input_ids': output,
'attention_mask': torch.ones(1, len(output[0])),
}
# 服务端
with Deployment(uses=TokenStreamingExecutor, port=12345, protocol='grpc') as dep:
dep.block()
# 客户端
async def main():
client = Client(port=12345, protocol='grpc', asyncio=True)
async for doc in client.stream_doc(
on='/stream',
inputs=PromptDocument(prompt='what is the capital of France ?', max_tokens=10),
return_type=ModelOutputDocument,
):
print(doc.generated_text)
Jina-serve 由 Jina AI 提供支持,采用 Apache-2.0 许可证。