AWS-lambda同步S3数据到BOS
概览
利用 AWS Lambda 来实时同步用户上传到 S3 Bucket 的 Object 到 BOS Bucket。
需求场景
AWS Lambda 是一项计算服务,无需预配置或管理服务器即可运行代码,你可以通过配置 AWS Lambda的触发器来执行你上传的函数代码。因此,我们可以利用 AWS Lambda 来实时同步用户上传到 S3 Bucket 的 Object 到 BOS Bucket。
方案概述
您可以登录 AWS,使用 Lambda 设置针对文件上传的监听规则,并根据该操作触发文件同步到 BOS 的操作。
实践步骤
-
登录AWS 控制台,进入Lambda服务控制台,点击“创建函数”
-
从头开始创建函数,该示例中运行语言选择Go 1.x(后续示例lambda程序也是Go程序,用户可以选择自己擅长的语言来实现lambda函数),角色选择从模板创建新角色即可
-
配置Lambda函数Desiger、函数代码、并发、内存分配、超时时间,环境变量等,按需配置
-
基本配置:按需配置,例如你的代码执行需要的内存大小,超时时间、环境变量
注意事项
- 超时时间建议填1分钟以上,Lambda函数从S3下载文件,再上传到BOS耗时与Object大小有关,有时会比较长,按实际情况设置即可;
- Lambda函数保存Object到磁盘,只能选择/tmp/目录,其他目录没有写权限;
- 内存配置与函数运行时需要的内存大小有关,按实际需要设置即可。
-
上传lambda代码:上传代码必须为zip格式,处理程序填写Go文件的文件名
-
配置触发器:我们选择S3触发器,进一步选择需要监听的S3存储桶,还能通过object前后缀缩小监听事件范围;添加之后会看到触发器内容为:存储桶: s3/bucket-name 事件类型: ObjectCreated
-
-
保存配置,用户每次上传object到S3 bucket,就会执行上传的Go lambda代码,同步该object到BOS;进入CloudWatch控制台可以查看程序执行的日志
Lambda函数代码
-
库环境准备:参考AWS S3 Golang sdk和BOS Golang sdk使用教程
Plain Text1go get -u github.com/aws/aws-sdk-go/... 2go get github.com/aws/aws-lambda-go/lambda 3go get github.com/baidubce/bce-sdk-go
-
代码保存在s32bos.go文件中,编译成二进制文件之后压缩成zip文件上传,命令如下
Plain Text1GOOS=linux GOARCH=amd64 go build -o s32bos s32bos.go 2zip s32bos.zip s32bos
-
s32bos.go代码:
Plain Text1package main 2 3import ( 4 "context" 5 "fmt" 6 "os" 7) 8 9import ( 10 "github.com/aws/aws-lambda-go/events" 11 "github.com/aws/aws-lambda-go/lambda" 12 "github.com/aws/aws-sdk-go/aws" 13 "github.com/aws/aws-sdk-go/aws/credentials" 14 "github.com/aws/aws-sdk-go/aws/session" 15 "github.com/aws/aws-sdk-go/service/s3" 16 "github.com/aws/aws-sdk-go/service/s3/s3manager" 17 "github.com/baidubce/bce-sdk-go/services/bos" 18) 19 20var ( 21 S3_REGION = "ap-northeast-1" 22 S3_AK = "AKIAJ*****LWGRYAULYTA" 23 S3_SK = "ABZrI****RqhrE8VzvuESvQMWXcsOPiMJ7" 24 25 BOS_AK = "388e4e0b*****cfff3cbecb5" 26 BOS_SK = "2db53e648c53456e***5b7d378" 27 BOS_ENDPOINT = "http://bj.bcebos.com" 28 BOS_BUCKET = "bucket-name" 29) 30 31func downloadFromS3(bucket string, object string) { 32 file, err := os.Create("/tmp/" + object) 33 if err != nil { 34 fmt.Printf("Unable to open file, err %v", err) 35 } 36 defer file.Close() 37 // init s3 downloader 38 sess, err := session.NewSession(&aws.Config{ 39 Region: aws.String(S3_REGION), 40 Credentials: credentials.NewStaticCredentials(S3_AK, S3_SK, ""), 41 }) 42 downloader := s3manager.NewDownloader(sess) 43 44 numBytes, err := downloader.Download(file, 45 &s3.GetObjectInput{ 46 Bucket: aws.String(bucket), 47 Key: aws.String(object), 48 }) 49 if err != nil { 50 fmt.Printf("Unable to download object %s, size %d, err %v", object, numBytes, err) 51 } 52 fmt.Println("download from s3 success") 53} 54 55func upload2Bos(bucket string, object string) { 56 bosClient, err := bos.NewClient(BOS_AK, BOS_SK, BOS_ENDPOINT) 57 if err != nil { 58 fmt.Printf("create bos client err %v", err) 59 return 60 } 61 62 etag, err := bosClient.PutObjectFromFile(bucket, object, "/tmp/"+object, nil) 63 if err != nil { 64 fmt.Printf("put object to bos err %v", err) 65 return 66 } 67 fmt.Printf("upload2Bos success etag: %s", etag) 68} 69 70func HandleLambdaEvent(ctx context.Context, s3Event events.S3Event) { 71 fmt.Println("start to handle lambda event") 72 for _, record := range s3Event.Records { 73 s3 := record.S3 74 fmt.Printf("[%s - %s] Bucket = %s, Object = %s \n", record.EventSource, record.EventTime, s3.Bucket.Name, s3.Object.Key) 75 76 downloadFromS3(s3.Bucket.Name, s3.Object.Key) 77 upload2Bos(BOS_BUCKET, s3.Object.Key) 78 79 } 80 81 fmt.Println("handle lambda event success") 82} 83 84func main() { 85 lambda.Start(HandleLambdaEvent) 86}