名称: azure-storage-blob-py
描述: |
Azure Blob Storage 的 Python SDK。用于上传、下载、列出 Blob,管理容器以及 Blob 生命周期。
触发词:"blob storage", "BlobServiceClient", "ContainerClient", "BlobClient", "upload blob", "download blob"。
package: azure-storage-blob
用于 Azure Blob Storage 的客户端库——专为存储非结构化数据设计的对象存储服务。
pip install azure-storage-blob azure-identity
AZURE_STORAGE_ACCOUNT_NAME=<你的存储账户名>
# 或使用完整 URL
AZURE_STORAGE_ACCOUNT_URL=https://<账户名>.blob.core.windows.net
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
credential = DefaultAzureCredential()
account_url = "https://<账户名>.blob.core.windows.net"
blob_service_client = BlobServiceClient(account_url, credential=credential)
| 客户端 | 用途 | 获取方式 |
|---|---|---|
BlobServiceClient |
账户级操作 | 直接实例化 |
ContainerClient |
容器操作 | blob_service_client.get_container_client() |
BlobClient |
单个 Blob 操作 | container_client.get_blob_client() |
container_client = blob_service_client.get_container_client("mycontainer")
container_client.create_container()
# 从文件路径上传
blob_client = blob_service_client.get_blob_client(
container="mycontainer",
blob="sample.txt"
)
with open("./local-file.txt", "rb") as data:
blob_client.upload_blob(data, overwrite=True)
# 从字节/字符串上传
blob_client.upload_blob(b"Hello, World!", overwrite=True)
# 从流上传
import io
stream = io.BytesIO(b"Stream content")
blob_client.upload_blob(stream, overwrite=True)
blob_client = blob_service_client.get_blob_client(
container="mycontainer",
blob="sample.txt"
)
# 下载到文件
with open("./downloaded.txt", "wb") as file:
download_stream = blob_client.download_blob()
file.write(download_stream.readall())
# 下载到内存
download_stream = blob_client.download_blob()
content = download_stream.readall() # 字节数据
# 读取到现有缓冲区
stream = io.BytesIO()
num_bytes = blob_client.download_blob().readinto(stream)
container_client = blob_service_client.get_container_client("mycontainer")
# 列出所有 Blob
for blob in container_client.list_blobs():
print(f"{blob.name} - {blob.size} bytes")
# 按前缀(类似文件夹)列出
for blob in container_client.list_blobs(name_starts_with="logs/"):
print(blob.name)
# 遍历 Blob 层次结构(虚拟目录)
for item in container_client.walk_blobs(delimiter="/"):
if item.get("prefix"):
print(f"目录: {item['prefix']}")
else:
print(f"Blob: {item.name}")
blob_client.delete_blob()
# 删除包含快照
blob_client.delete_blob(delete_snapshots="include")
# 为大文件上传/下载配置块大小
blob_client = BlobClient(
account_url=account_url,
container_name="mycontainer",
blob_name="large-file.zip",
credential=credential,
max_block_size=4 * 1024 * 1024, # 4 MiB 块
max_single_put_size=64 * 1024 * 1024 # 64 MiB 单次上传限制
)
# 并行上传
blob_client.upload_blob(data, max_concurrency=4)
# 并行下载
download_stream = blob_client.download_blob(max_concurrency=4)
from datetime import datetime, timedelta, timezone
from azure.storage.blob import generate_blob_sas, BlobSasPermissions
sas_token = generate_blob_sas(
account_name="<账户名>",
container_name="mycontainer",
blob_name="sample.txt",
account_key="<账户密钥>", # 或使用用户委托密钥
permission=BlobSasPermissions(read=True),
expiry=datetime.now(timezone.utc) + timedelta(hours=1)
)
# 使用 SAS 令牌
blob_url = f"https://<账户名>.blob.core.windows.net/mycontainer/sample.txt?{sas_token}"
# 获取属性
properties = blob_client.get_blob_properties()
print(f"大小: {properties.size}")
print(f"内容类型: {properties.content_settings.content_type}")
print(f"最后修改时间: {properties.last_modified}")
# 设置元数据
blob_client.set_blob_metadata(metadata={"category": "logs", "year": "2024"})
# 设置内容类型
from azure.storage.blob import ContentSettings
blob_client.set_http_headers(
content_settings=ContentSettings(content_type="application/json")
)
from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob.aio import BlobServiceClient
async def upload_async():
credential = DefaultAzureCredential()
async with BlobServiceClient(account_url, credential=credential) as client:
blob_client = client.get_blob_client("mycontainer", "sample.txt")
with open("./file.txt", "rb") as data:
await blob_client.upload_blob(data, overwrite=True)
# 异步下载
async def download_async():
async with BlobServiceClient(account_url, credential=credential) as client:
blob_client = client.get_blob_client("mycontainer", "sample.txt")
stream = await blob_client.download_blob()
data = await stream.readall()
DefaultAzureCredential 而非连接字符串overwrite=True 以覆盖重传max_concurrency 提升大文件传输效率readinto() 而非 readall() 以优化内存walk_blobs() 进行层次化列表操作