bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入
bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入
bws-rs介绍
bws-rs
是一个用 Rust 编写的轻量级 S3 协议服务端网关框架,旨在帮助开发者快速构建兼容 AWS S3 协议 的对象存储服务。该框架支持 S3 V4 签名校验,集成 Axum 作为 Web 框架,所有协议校验逻辑通过实现对应的 trait 并注册为 axum::Extension 实现非侵入式扩展,具有良好的可维护性与可插拔性。
bws-rs 可作为前端网关挂载在你已有的文件系统、对象存储系统甚至缓存引擎之前,为其提供标准化的 S3 协议兼容层,支持与 AWS CLI、MinIO Client 等主流 S3 SDK 的交互。
✅ 已支持的功能
📁 S3 协议支持列表
-
PutObject(上传对象)
-
GetObject(获取对象)
-
HeadObject(获取对象元信息)
-
DeleteObject(删除对象)
-
CreateBucket(创建桶)
-
HeadBucket(桶存在性检查)
-
ListBucket(列举所有桶)
-
DeleteBucket(删除桶)
-
GetBucketLocation(获取桶区域)
-
MultipartUpload(分片上传)
-
Range Get(部分下载)
-
Get/Put Object ACL(访问控制列表)
-
Get/Put Object Metadata(对象元数据)
-
Put Object Tagging(对象标签)
✅ MinIO SDK 兼容性验证
使用 MinIO Go SDK 进行功能验证,支持以下操作:
-
MakeBucket
-
DeleteBucket
-
ListBucket
-
ListObject
-
PutObject
-
DeleteObject
-
BucketExists
在项目中使用bws-rs: cargo add bws-rs
实现bws_rs::service::s3
下对应的trait以支持对应的s3 功能
HeadHandler
: 对应 s3 head object ,head bucketGetObjectHandler
: 对应s3 GetObjectPutObjectHandler
: 对应s3 PutObjectDeleteObjectHandler
: 对应s3 DeleteObjectListObjectHandler
: 对应s3 ListObjectCreateBucketHandler
: 对应的s3 create bucketListBucketHandler
: 对应s3 list bucketDeleteBucketHandler
: 对应s3 delete bucketGetBucketLocationHandler
: 对应s3 get bucket locationMultiUploadObjectHandler
: 对应s3 MultiUpload系列操作
aceeskey 仓库需要实现bws_rs::authorization::AccesskeyStore
来提供对应accesskey的secretkey
使用示范
use std::sync::Arc;use tokio::io::AsyncReadExt;#[derive(Default)]struct Target {}use crate::service::s3::*;impl CreateBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a CreateBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("create bucket {_bucket}");Ok(())})}}impl ListBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a ListBucketsOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<Vec<Bucket>, String>>>,> {Box::pin(async move {let datetime = chrono::Utc::now().to_rfc3339();Ok(vec![Bucket {name: "test1".to_string(),creation_date: datetime,bucket_region: "us-east-1".to_string(),}])})}}impl HeadHandler for Target {fn lookup<'a>(&self,_bucket: &str,_object: &str,) -> std::pin::Pin<Box<dyn 'a+ Send+ Sync+ std::future::Future<Output = Result<Option<HeadObjectResult>, Error>>,>,> {Box::pin(async move {let mut ret: HeadObjectResult = Default::default();ret.checksum_sha256 = Some("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824".to_string(),);ret.content_length = Some(5);ret.etag = Some("5d41402abc4b2a76b9719d911017c592".to_string());ret.last_modified = Some(chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(),);Ok(Some(ret))})}}impl PutObjectHandler for Target {fn handle<'a>(&'a self,opt: &PutObjectOption,bucket: &'a str,object: &'a str,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("put bucket {bucket} object {object}");let mut buff = vec![];match body.read_to_end(&mut buff).await {Ok(size) => {log::info!("get {}", unsafe {std::str::from_utf8_unchecked(&buff[..size])});}Err(err) => {log::error!("read error {err}");}}Ok(())})}}impl DeleteBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete bucket {_bucket}");Ok(())})}}impl DeleteObjectHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteObjectOption,_object: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete object {_object}");Ok(())})}}impl crate::authorization::AccesskeyStore for Target {fn get<'a>(&'a self,_accesskey: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + Sync + std::future::Future<Output = Result<Option<String>, String>>,>,> {Box::pin(async move { Ok(Some(format!("{_accesskey}12345"))) })}}impl crate::service::s3::GetObjectHandler for Target {fn handle<'a>(&'a self,bucket: &str,object: &str,opt: crate::service::s3::GetObjectOption,mut out: tokio::sync::Mutex<std::pin::Pin<std::boxed::Box<(dyn crate::utils::io::PollWrite + Send + Unpin + 'a)>,>,>,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {let mut l = out.lock().await;let _ = l.poll_write(b"hello").await.map_err(|err| {log::error!("write error {err}");});Ok(())})}}impl crate::service::s3::GetBucketLocationHandler for Target {}impl MultiUploadObjectHandler for Target {fn handle_create_session<'a>(&'a self,bucket: &'a str,key: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("ffffff".to_string()) })}fn handle_upload_part<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,part_number: u32,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move {let mut buff = Vec::new();let size = body.read_to_end(&mut buff).await.map_err(|err| log::error!("read body error {err}"))?;println!("upload part upload_id={upload_id} part_number={part_number} bucket={bucket} key={key}\n{}",unsafe { std::str::from_boxed_utf8_unchecked((&buff[..size]).into()) });Ok("5d41402abc4b2a76b9719d911017c592".to_string())})}fn handle_complete<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,//(etag,part number)data: &'a [(&'a str, u32)],opts: MultiUploadObjectCompleteOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("69a329523ce1ec88bf63061863d9cb14".to_string()) })}fn handle_abort<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), ()>>>>{todo!()}}#[tokio::test]async fn test_server() -> Result<(), Box<dyn std::error::Error>> {let _ = tokio::fs::create_dir_all(".sys_bws").await;env_logger::builder().filter_level(log::LevelFilter::Info).init();let target = Arc::new(Target::default());let r = axum::Router::new().layer(axum::middleware::from_fn(super::handle_fn)).layer(axum::middleware::from_fn(super::handle_authorization_middleware,)).layer(axum::Extension(target.clone() as Arc<dyn PutObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn HeadHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn ListBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn CreateBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn crate::authorization::AccesskeyStore + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetBucketLocationHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn MultiUploadObjectHandler + Send + Sync>));let l = tokio::net::TcpListener::bind("0.0.0.0:9900").await?;axum::serve(l, r).await?;Ok(())}
golang 客户端
package testsimport ("context""io""os""testing""github.com/minio/minio-go/v7""github.com/minio/minio-go/v7/pkg/credentials"
)func TestCreateBucket(t *testing.T) {creds, err := minio.New("127.0.0.1:9900", &minio.Options{Secure: false, Creds: credentials.NewStaticV4("root", "root12345", ""),Region: "us-east-1",})if err != nil {t.Fatal(err)}_, err = creds.BucketExists(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.MakeBucket(context.Background(), "itest", minio.MakeBucketOptions{})if err != nil {t.Fatal(err)}bkts, err := creds.ListBuckets(context.Background())if err != nil {t.Fatal(err)}t.Log(bkts)err = creds.RemoveBucket(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.RemoveObject(context.Background(), "test", "test", minio.RemoveObjectOptions{})if err != nil {t.Fatal(err)}err = os.WriteFile("test.txt", []byte("hello"), 0o644)if err != nil {t.Fatal(err)}fd, err := os.OpenFile("test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()_, err = creds.PutObject(context.Background(), "test", "hello/world", fd, 5, minio.PutObjectOptions{})if err != nil {t.Fatal(err)}resp, err := creds.GetObject(context.Background(), "test", "test", minio.GetObjectOptions{})if err != nil {t.Fatal(err)}content, err := io.ReadAll(resp)if err != nil {t.Fatal(err)}if string(content) != "hello" {t.Fatal("expect hello got [" + string(content) + "]")}
}
s3 multipart 验证
package testsimport ("context""crypto/tls""fmt""log""net/http""os""testing""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/credentials""github.com/aws/aws-sdk-go-v2/service/s3""github.com/aws/aws-sdk-go-v2/service/s3/types"
)func TestS3Sdk(t *testing.T) {var (host = "127.0.0.1"port = 9900accesskey = "root"secretkey = "root12345"region = "us-east-1")customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {if service == s3.ServiceID {return aws.Endpoint{URL: fmt.Sprintf("http://%s:%d", host, port),SigningRegion: "us-east-1",}, nil}return aws.Endpoint{}, &aws.EndpointNotFoundError{}})// 加载 AWS 配置,指定自定义端点解析器cfg, err := config.LoadDefaultConfig(context.TODO(),config.WithEndpointResolverWithOptions(customResolver),config.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},},}),config.WithRegion(region),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accesskey, secretkey, "")),)if err != nil {log.Fatalf("无法加载 AWS 配置: %v", err)}// 创建 S3 客户端cli := s3.NewFromConfig(cfg, func(o *s3.Options) {o.UsePathStyle = true})var (bucket = "itest"key = "test.txt")fd, err := os.OpenFile("./test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()out, err := cli.CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{Bucket: &bucket,Key: &key,})if err != nil {t.Fatal(err)}var upNo int32 = 1resp, err := cli.UploadPart(context.Background(), &s3.UploadPartInput{Bucket: &bucket, Key: &key, PartNumber: &upNo, UploadId: out.UploadId, Body: fd,})if err != nil {t.Fatal(err)}_, err = cli.CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{Bucket: &bucket, Key: &key, UploadId: out.UploadId, MultipartUpload: &types.CompletedMultipartUpload{Parts: []types.CompletedPart{{ETag: resp.ETag, PartNumber: &upNo,},},},})if err != nil {t.Fatal(err)}
}