OA0
OA0 是一个探索 AI 的社区
现在注册
已注册用户请  登录
OA0  ›  代码  ›  Pathway — 实时数据管道与 RAG 应用构建引擎

Pathway — 实时数据管道与 RAG 应用构建引擎

 
  board ·  2026-02-11 14:49:17 · 7 次点击  · 0 条评论  

ubuntu
Last release PyPI version PyPI downloads License: BSL
chat on Discord follow on Twitter follow on LinkedIn Awesome Python Pathway Guru
快速开始 | 部署 | 文档与支持 | 博客 | 许可证

Pathway 实时数据框架

Pathway 是一个用于流处理、实时分析、LLM 管道和 RAG 的 Python ETL 框架。

Pathway 提供易于使用的 Python API,让你可以无缝集成你喜爱的 Python ML 库。
Pathway 代码通用且健壮:你可以在开发和生产环境中使用它,有效处理批处理和流式数据
同一份代码可用于本地开发、CI/CD 测试、运行批处理作业、处理流重放以及处理数据流。

Pathway 由基于 Differential Dataflow 的可扩展 Rust 引擎驱动,并执行增量计算。
你的 Pathway 代码虽然用 Python 编写,但由 Rust 引擎运行,支持多线程、多进程和分布式计算。
整个管道都保存在内存中,可以轻松地使用 Docker 和 Kubernetes 进行部署。

你可以使用 pip 安装 Pathway:

pip install -U pathway

如有任何问题,你可以在 Discord 上找到项目背后的社区和团队。

用例与模板

准备好了解 Pathway 能做什么了吗?

试试我们易于运行的示例吧!

这些即开即用的示例提供 notebook 和 docker 两种格式,只需点击几下即可启动。选择一个,立即开始你的 Pathway 实践体验!

事件处理与实时分析管道

凭借其批流一体的统一引擎和完全的 Python 兼容性,Pathway 让数据处理变得尽可能简单。它是各种数据处理管道的理想解决方案,包括:

AI 管道

Pathway 提供专门的 LLM 工具来构建实时的 LLM 和 RAG 管道。它包含了最常见 LLM 服务的包装器和实用工具,使得使用 LLM 和 RAG 管道变得异常简单。查看我们的 LLM xpack 文档

请务必尝试我们包含 LLM 工具的可运行示例。
你可以在此处找到这些示例这里

特性

  • 广泛的连接器:Pathway 自带连接器,可连接到外部数据源,如 Kafka、GDrive、PostgreSQL 或 SharePoint。其 Airbyte 连接器允许你连接到 300 多个不同的数据源。如果你需要的连接器不可用,可以使用 Pathway Python 连接器构建自己的自定义连接器。
  • 无状态和有状态转换:Pathway 支持有状态转换,如连接、窗口化和排序。它提供了许多直接在 Rust 中实现的转换。除了提供的转换,你还可以使用任何 Python 函数。你可以实现自己的函数,也可以使用任何 Python 库来处理数据。
  • 持久化:Pathway 提供持久化功能来保存计算状态。这允许你在更新或崩溃后重新启动管道。你的管道在 Pathway 的掌控之中!
  • 一致性:Pathway 为你处理时间,确保所有计算都是一致的。特别是,Pathway 通过在有新(或延迟)数据点进入系统时更新其结果来处理延迟和乱序数据点。Pathway 免费版提供“至少一次”一致性,而企业版提供“恰好一次”一致性。
  • 可扩展的 Rust 引擎:借助 Pathway Rust 引擎,你不再受 Python 常见限制的束缚。你可以轻松地进行多线程、多进程和分布式计算。
  • LLM 助手:Pathway 提供了一个 LLM 扩展,包含将 LLM 与数据管道集成的所有实用工具(LLM 包装器、解析器、嵌入器、分割器),包括一个内存中的实时向量索引,以及与 LLamaIndex 和 LangChain 的集成。你可以使用你的实时文档快速构建和部署 RAG 应用。

快速开始

安装

Pathway 需要 Python 3.10 或更高版本。

你可以使用 pip 安装当前版本的 Pathway:

$ pip install -U pathway

⚠️ Pathway 可在 MacOS 和 Linux 上使用。其他系统的用户应在虚拟机上运行 Pathway。

示例:实时计算正数之和。

import pathway as pw

# 定义数据模式(可选)
class InputSchema(pw.Schema):
  value: int

# 使用连接器连接到你的数据
input_table = pw.io.csv.read(
  "./input/",
  schema=InputSchema
)

# 定义对数据的操作
filtered_table = input_table.filter(input_table.value>=0)
result_table = filtered_table.reduce(
  sum_value = pw.reducers.sum(filtered_table.value)
)

# 将结果加载到外部系统
pw.io.jsonlines.write(result_table, "output.jsonl")

# 运行计算
pw.run()

Google Colab 中运行 Pathway。

你可以在这里找到更多示例。

部署

本地运行

要使用 Pathway,你只需要导入它:

import pathway as pw

现在,你可以轻松创建处理管道,并让 Pathway 处理更新。一旦管道创建完成,你可以用一行命令在流数据上启动计算:

pw.run()

然后,你可以像运行普通 Python 脚本一样运行你的 Pathway 项目(例如 main.py):$ python main.py
Pathway 附带一个监控仪表板,允许你跟踪每个连接器发送的消息数量和系统延迟。仪表板还包括日志消息。

Pathway dashboard

或者,你可以使用 pathway 风格的方式:

$ pathway spawn python main.py

Pathway 原生支持多线程。
要使用 3 个线程启动你的应用程序,可以这样做:

$ pathway spawn --threads 3 python main.py

要快速启动一个 Pathway 项目,你可以使用我们的 cookiecutter 模板

Docker

你可以轻松地使用 Docker 运行 Pathway。

Pathway 镜像

你可以使用 Pathway Docker 镜像,通过 Dockerfile:

FROM pathwaycom/pathway:latest

WORKDIR /app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "./your-script.py" ]

然后你可以构建并运行 Docker 镜像:

docker build -t my-pathway-app .
docker run -it --rm --name my-pathway-app my-pathway-app

运行单个 Python 脚本

对于单文件项目,创建一个完整的 Dockerfile 可能显得不必要。在这种情况下,你可以直接使用 Pathway Docker 镜像执行 Python 脚本。例如:

docker run -it --rm --name my-pathway-app -v "$PWD":/app pathwaycom/pathway:latest python my-pathway-app.py

Python docker 镜像

你也可以使用标准的 Python 镜像,并通过 pip 安装 Pathway,使用 Dockerfile:

FROM --platform=linux/x86_64 python:3.10

RUN pip install -U pathway
COPY ./pathway-script.py pathway-script.py

CMD ["python", "-u", "pathway-script.py"]

Kubernetes 和云

Docker 容器非常适合在云上使用 Kubernetes 进行部署。
如果你想扩展你的 Pathway 应用,你可能会对我们的 Pathway for Enterprise 感兴趣。
Pathway for Enterprise 专门为端到端数据处理和实时智能分析量身定制。
它利用云上的分布式计算进行扩展,并支持分布式 Kubernetes 部署,以及外部持久化设置。

你可以轻松地使用 Render 等服务部署 Pathway:参见如何在几次点击中部署 Pathway

如果你有兴趣,请随时联系我们了解更多信息。

性能

Pathway 旨在超越为流式和批处理数据任务设计的先进技术,包括:Flink、Spark 和 Kafka Streaming。它还使得在流模式下实现许多其他流框架不易支持的算法/UDF成为可能(尤其是:时间连接、迭代图算法、机器学习例程)。

如果你好奇,这里有一些可供试用的基准测试

WordCount Graph

文档与支持

Pathway 的完整文档可在 pathway.com/developers/ 获取,包括 API 文档

如果你有任何问题,请随时在 GitHub 上提交问题,加入我们的 Discord,或发送邮件至 contact@pathway.com

🤝 特色合作与集成

我们构建前沿的数据处理管道,并共同推广那些突破 Python 和流数据可能性的解决方案。
认识一下帮助我们塑造数据工程未来的人们。

| 项目 | 描述 | | ------------ | ----------- | | [Databento](https://databento.com/blog/option-greeks) | 获取市场数据的更简单、更快速的方式。 | | [LangChain](https://docs.langchain.com/oss/python/integrations/vectorstores/pathway) | LangChain 是智能体工程平台。 | | [LlamaIndex](https://developers.llamaindex.ai/python/examples/retrievers/pathway_retriever/) | 开发者信赖的构建上下文感知 AI 智能体的框架。 | | [MinIO](https://www.min.io/) | MinIO 是一个高性能、兼容 S3 的对象存储,在 GNU AGPLv3 许可证下开源。 | | [PaddleOCR](https://github.com/PaddlePaddle/PaddleOCR) | PaddleOCR 是行业领先、生产就绪的 OCR 和文档 AI 引擎,提供从文本提取到智能文档理解的端到端解决方案。 | | [Redpanda](https://www.redpanda.com/blog/replace-kafka-redpanda-data-analysis-streaming) | 在没有 Kafka 复杂性的情况下构建、运营和管理流式及 AI 应用。 |

许可证

Pathway 基于 BSL 1.1 许可证分发,允许无限的非商业使用,以及大多数商业用途免费使用 Pathway 包。此仓库中的代码在 4 年后自动转换为开源(Apache 2.0 许可证)。一些与此仓库互补的公共仓库(示例、库、连接器等)以开源许可证(MIT 许可证)授权。

贡献指南

如果你开发了一个希望与此仓库集成的库或连接器,我们建议首先将其作为单独的仓库以 MIT/Apache 2.0 许可证发布。

对于所有关于核心 Pathway 功能的问题,鼓励提交 Issues。如需更多信息,请随时与 Pathway 的 Discord 社区互动。

7 次点击  ∙  0 人收藏  
登录后收藏  
0 条回复
关于 ·  帮助 ·  PING ·  隐私政策 ·  服务条款   
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
耗时 26 ms
Developed with Cursor