Introduction
In this post, we will program an application in Golang that allows us to stream data in “real-time” from an S3 bucket using gRPC. To do this we will:
- Create an S3 bucket
- Generate test data with Golang and store it in our bucket
- Define protocol buffer and gRPC
- Implement gRPC streaming in our server
- Test our server implementing a simple test client
If half of the terms here don’t mean anything to you, no problem, we clarify everything in this article. If you want to learn more about these topics yourself, you can find good references here:
- AWS S3: https://aws.amazon.com/s3/?nc1=h_ls
- gRPC: https://grpc.io/
- gRPC-web: https://github.com/grpc/grpc-web
- Protocol-Buffer: https://developers.google.com/protocol-buffers/docs/proto3
The repository that contains the final code can be found here: https://github.com/AICDEV/s3_streaming_with_golang
The Project
Data
Now that we know what we want to build, we still need data. I thought we would do this on a simple example with transaction data. So the account transactions of a user. An example data set could look like this:
User:
- email
- ID
- []Transaction
Transaction
- amount
- currency
- recipient
- reason
- verification_key
For our S3 storage this means that we have a folder “transactions” in which we create another folder for each user-id. In this folder we store serialized transaction (500 pieces per file). It looks like this:
<bucket>/<user-id>/transactions
For our application this means that we have a gRPC function “GetUserTransactions” which expects the user-id as parameter, then loads the data from S3 and in a go routine sends the data as stream to the client.
Protocol Buffer and gRPC definition
A proposal for a protocol buffer definition:
syntax = "proto3";
option go_package = "github.com/aicdev/s3_streaming_with_golang";
package transactions;
message User {
string email = 1;
string id = 2;
repeated Transaction transactions = 3;
}
message TransactionCollection {
repeated Transaction transactions = 1;
}
message Transaction {
double anmount = 1;
enum Currency {
EUR = 0;
USD = 1;
}
Currency currency = 2;
string recipient = 3;
string reason = 4;
string verificationKey = 5;
string userId = 6;
}
service TransactionService {
rpc GetUserTransactions(User) returns (stream Transaction) {}
}
Now that we have defined our protocol buffer and our service (rpc), we still need to compile our definition to golang. For this we first create our go project. Please make sure that you have installed the following dependencies (if you run in trouble with golang and protocol buffer, check: https://grpc.io/docs/languages/go/quickstart/):
go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc
go get -u google.golang.org/grpc
Next step is to create our build script. Therefore we create the file “build_golang_proto.sh” and a folder “proto” inside our project folder and paste the following content into the build_golang_proto.sh file:
protoc --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative transaction.proto -I . -I .
Then run the script and watch the output inside the ./proto folder. I recommend to look in the output. This image here is only a short preview:

After that my project folder looks like this:

AWS S3 Bucket
In AWS we need 2 things:
- A user or API credentials to programmatically connect us to S3.
- A bucket
For the first topic just follow the offical docs from aws: https://docs.aws.amazon.com/powershell/latest/userguide/pstools-appendix-sign-up.html
For the bucket just search for S3 in the aws service catalog and click on it:

Then click on the big orange button “create bucket” and enter your bucket options. My bucket looks like this:

And after creation you should see something like this:

Demo data creation
Time for the first lines of code. First of all, let’s start by creating a few thousand test transactions. But before we start: We want to control our program via command lines arguments to set a mode.
--mode=test = test data generation
--mode=stream = streaming from s3
to do this we still need to install the following go modules: https://github.com/alecthomas/kingpin
go get gopkg.in/alecthomas/kingpin.v2
My test data generator looks like this:
package data
import (
"crypto/sha512"
"encoding/hex"
"fmt"
"math/rand"
"time"
pb "github.com/aicdev/s3_streaming_with_golang/proto"
)
const letters = "abcdefghijklmnopqrstuvwxyz"
type TestDataServiceInterface interface {
CreateTestData(int)
GetTestData() []*pb.User
}
type testDataService struct {
testData []*pb.User
}
func NewTestDataService() TestDataServiceInterface {
rand.Seed(time.Now().Unix())
return &testDataService{}
}
func (tsd *testDataService) CreateTestData(limit int) {
for i := 0; i < limit; i++ {
email := fmt.Sprintf("%[email protected]%s.%s", getRandomStringBytes(8), getRandomStringBytes(4), getRandomStringBytes(2))
testUser := &pb.User{
Email: email,
Id: GetHash(email),
}
tsd.createTestTransactions(testUser)
tsd.testData = append(tsd.testData, testUser)
}
}
func (tsd *testDataService) GetTestData() []*pb.User {
return tsd.testData
}
func (tsd *testDataService) createTestTransactions(testUser *pb.User) {
for i := 0; i < 5000; i++ {
testUser.Transactions = append(testUser.Transactions, &pb.Transaction{
Anmount: rand.Float64(),
Recipient: fmt.Sprintf("%[email protected]%s.%s", getRandomStringBytes(8), getRandomStringBytes(4), getRandomStringBytes(2)),
Reason: getRandomStringBytes(20),
VerificationKey: fmt.Sprintf("%s-%s", getRandomStringBytes(4), getRandomStringBytes(8)),
UserId: testUser.GetId(),
Currency: pb.Transaction_USD,
})
}
}
// inspired from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go
func getRandomStringBytes(limit int) string {
b := make([]byte, limit)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func GetHash(t string) string {
sum := sha512.Sum512([]byte(t))
return hex.EncodeToString(sum[:])
}
Let’s test if we get some garbage data. Therefore we need to create a new TestDataService and call the generation:
package app
import (
"fmt"
"log"
"os"
"github.com/aicdev/s3_streaming_with_golang/data"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)
func StartApplication() {
kingpin.Parse()
log.Printf("starting streaming service in mode: %s", *mode)
if *mode == "test" {
runTestDataCreation()
}
}
func runTestDataCreation() {
testDataCreator := data.NewTestDataService()
testDataCreator.CreateTestData(5)
for _, v := range testDataCreator.GetTestData() {
fmt.Println(v.GetTransactions())
}
os.Exit(0)
}
Then start the script by running:
go run main.go --mode=test
And we get some nice garbage output:

Now that we have our test data we still need to store it in AWS S3.
Save data in AWS S3
One important ressource to read is the aws sdk documentation: https://aws.amazon.com/sdk-for-go/?nc1=h_ls and the getting started guide where the installation is documented: https://aws.github.io/aws-sdk-go-v2/docs/getting-started/. Notice that my example is not based on the api version 2. But it should work.
To install in short run:
go get github.com/aws/aws-sdk-go/aws
# if there are any weird behaviours run
# go mod tidy
And to parse environment variables in a nice way install:
go get github.com/kelseyhightower/envconfig
My AWS session handler looks like this:
package cloud
import (
"log"
"github.com/aicdev/s3_streaming_with_golang/env"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)
type AwsServiceInterface interface {
GetSession() *session.Session
}
type awsService struct {
Session *session.Session
}
var (
AwsService AwsServiceInterface = &awsService{}
)
func (as *awsService) GetSession() *session.Session {
if as.Session != nil {
return as.Session
}
parsedEnv, _ := env.ParseEnvironmentVariables()
sess, err := session.NewSession(&aws.Config{
Region: &parsedEnv.S3Region,
Credentials: credentials.NewEnvCredentials(),
})
if err != nil {
log.Fatal(err)
}
as.Session = sess
return as.Session
}
And the environment variable parser:
package env
import (
"github.com/kelseyhightower/envconfig"
)
type Streamingnvironment struct {
Port string `required:"true" default:"9999"`
S3Region string `required:"true" default:"eu-central-1"`
S3BucketName string `required:"true"`
}
func ParseEnvironmentVariables() (*Streamingnvironment, error) {
env := &Streamingnvironment{}
if err := envconfig.Process("streaming_example", env); err != nil {
return env, err
}
return env, nil
}
The next important step is that when we start our application we check if all required environment variables are set. For this we use the init method in our app.go. Therefore we at that little peace of code into the init function:
package app
import (
"fmt"
"log"
"os"
"github.com/aicdev/s3_streaming_with_golang/data"
"github.com/aicdev/s3_streaming_with_golang/env"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)
func init() {
_, err := env.ParseEnvironmentVariables()
if err != nil {
log.Fatal(err.Error())
}
}
func StartApplication() {
kingpin.Parse()
log.Printf("starting streaming service in mode: %s", *mode)
if *mode == "test" {
runTestDataCreation()
}
}
func runTestDataCreation() {
testDataCreator := data.NewTestDataService()
testDataCreator.CreateTestData(5)
for _, v := range testDataCreator.GetTestData() {
fmt.Println(v.GetTransactions())
}
os.Exit(0)
}
Next step is to implement our uploader. Notice that the uploader has some “magic” inside. In order to stream thousands of transaction, we gonna “chunk” the transactions with a maximum size of 500. That means, serialize every bunch of 500 transaction and store them into the users folder in S3. That means, instead of downloading maybe 50MB of user data and send them back to client, we can start streaming every part after it’s get downloaded.
package uploader
import (
"bytes"
"fmt"
"log"
cloud "github.com/aicdev/s3_streaming_with_golang/aws"
"github.com/aicdev/s3_streaming_with_golang/env"
pb "github.com/aicdev/s3_streaming_with_golang/proto"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"google.golang.org/protobuf/proto"
)
type UploaderServiceInterface interface {
UploadTestData(*pb.User)
}
type uploaderService struct {
collectionChunkSize int
}
func NewUploaderService() UploaderServiceInterface {
return &uploaderService{
collectionChunkSize: 500,
}
}
func (ups *uploaderService) UploadTestData(user *pb.User) {
for i := 0; i < cap(user.GetTransactions()); i += ups.collectionChunkSize {
if (i + int(ups.collectionChunkSize)) > cap(user.GetTransactions())-1 {
fragments := user.GetTransactions()[i : cap(user.GetTransactions())-1]
tc := &pb.TransactionCollection{
Transactions: fragments,
}
raw, _ := proto.Marshal(tc)
ups.upload(user.GetId(), fmt.Sprintf("transaction-%d", i), raw)
} else {
fragments := user.GetTransactions()[i:cap(user.GetTransactions())]
tc := &pb.TransactionCollection{
Transactions: fragments,
}
raw, _ := proto.Marshal(tc)
ups.upload(user.GetId(), fmt.Sprintf("transaction-%d", i), raw)
}
}
}
func (ups *uploaderService) upload(userId string, key string, raw []byte) {
parsedEnv, _ := env.ParseEnvironmentVariables()
session := cloud.AwsService.GetSession()
up := s3manager.NewUploader(session)
_, err := up.Upload(&s3manager.UploadInput{
Bucket: aws.String(parsedEnv.S3BucketName),
Key: aws.String(fmt.Sprintf("%s/%s", userId, key)),
Body: bytes.NewReader(raw),
})
if err != nil {
log.Println(err.Error())
}
}
And call the uploader from our app.go
package app
import (
"log"
"os"
"github.com/aicdev/s3_streaming_with_golang/data"
"github.com/aicdev/s3_streaming_with_golang/env"
"github.com/aicdev/s3_streaming_with_golang/uploader"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)
func init() {
_, err := env.ParseEnvironmentVariables()
if err != nil {
log.Fatal(err.Error())
}
}
func StartApplication() {
kingpin.Parse()
log.Printf("starting streaming service in mode: %s", *mode)
switch *mode {
case "test":
runTestDataCreation()
case "stream":
bootRPCService()
default:
log.Fatalf("unknown mode: %s", *mode)
}
}
func runTestDataCreation() {
uploader := uploader.NewUploaderService()
testDataCreator := data.NewTestDataService()
testDataCreator.CreateTestData(1)
for _, v := range testDataCreator.GetTestData() {
uploader.UploadTestData(v)
}
os.Exit(0)
}
func bootRPCService() {
}
os.Exit(0)
}
Let’s run our application by calling (make sure that your have your AWS access credentials set as environment variables):
go run main.go --mode=test
The result in your bucket should look like this:


Summary
Let’s recap what we have done so far.
- We have created a protocol buffer definition with an rpc service definition and compiled to go.
- We have created an S3 bucket in AWS
- We have implemented test date creation mode and can store for each user his transaction data as serialized information (bytes) for each user in S3
Stream data from AWS S3
Now that we have test data, we can implement our RPC server and send the data from AWS as a stream to the client. Therefore we need a downloader that is able to download entries in a user folder and a streaming service that deserialized the downloader response and push the result to the client stream.
My download looks like this:
package downloader
import (
cloud "github.com/aicdev/s3_streaming_with_golang/aws"
"github.com/aicdev/s3_streaming_with_golang/env"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type DownloaderServiceInterface interface {
DownloadFromS3(string) ([]byte, error)
}
type downloadService struct {
env *env.Streamingnvironment
}
func NewDownloaderService() DownloaderServiceInterface {
parsedEnv, _ := env.ParseEnvironmentVariables()
return &downloadService{
env: parsedEnv,
}
}
func (dw *downloadService) DownloadFromS3(key string) ([]byte, error) {
awsSession := cloud.AwsService.GetSession()
dwm := s3manager.NewDownloader(awsSession, func(d *s3manager.Downloader) {
d.Concurrency = 10
})
raw := &aws.WriteAtBuffer{}
_, err := dwm.Download(raw, &s3.GetObjectInput{
Bucket: aws.String(dw.env.S3BucketName),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
return raw.Bytes(), nil
}
And my streaming service:
package services
import (
"log"
cloud "github.com/aicdev/s3_streaming_with_golang/aws"
"github.com/aicdev/s3_streaming_with_golang/downloader"
"github.com/aicdev/s3_streaming_with_golang/env"
pb "github.com/aicdev/s3_streaming_with_golang/proto"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"google.golang.org/protobuf/proto"
)
type StreamingServiceInterface interface {
Stream(string, chan *pb.TransactionCollection)
}
type streamingService struct{}
var (
StreamingService StreamingServiceInterface = &streamingService{}
)
func (ss *streamingService) Stream(id string, c chan *pb.TransactionCollection) {
parsedEnv, _ := env.ParseEnvironmentVariables()
sc := cloud.AwsService.GetSession()
bucket_client := s3.New(sc)
var continueToken *string
for {
resp, _ := bucket_client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(parsedEnv.S3BucketName),
Prefix: aws.String(id),
ContinuationToken: continueToken,
})
downloader := downloader.NewDownloaderService()
for _, key := range resp.Contents {
rawBytes, err := downloader.DownloadFromS3(*key.Key)
if err != nil {
log.Fatal(err)
}
transactionCollection := &pb.TransactionCollection{}
err = proto.Unmarshal(rawBytes, transactionCollection)
if err != nil {
log.Fatal(err)
}
if len(transactionCollection.GetTransactions()) > 0 {
c <- transactionCollection
}
}
if !aws.BoolValue(resp.IsTruncated) {
break
}
continueToken = resp.NextContinuationToken
}
close(c)
}
Implement a simple test client
Now it’s time to test our implementation. Therefore we create a simple rpc client:
package main
import (
"context"
"io"
"log"
"time"
pb "github.com/aicdev/s3_streaming_with_golang/proto"
"google.golang.org/grpc"
)
func main() {
con, err := grpc.Dial("localhost:9999", grpc.WithInsecure())
if err != nil {
log.Fatalf("connection error: %v", err)
}
defer con.Close()
c := pb.NewTransactionServiceClient(con)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
stream, err := c.GetUserTransactions(ctx, &pb.User{
Id: "acb03ff98e576ec4b5437d791d85d98c523821ecb61457668928f6b4c204ab944f5be2ca3605359d3a41aceb4946151a56fa16fdc09161e8a65129c19b4da83b",
})
if err != nil {
log.Fatalf("unable to get data: %v", err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatalf("cannot receive %v", err)
}
log.Printf("Resp received: %s", resp)
}
}
Call the client:
go run client.go
And watch the output stream:

Summary
I hope this post has shown one or the other how to work with gRPC, AWS S3 and golang. Please keep in mind that this are only fragments from code and this has not the claim to be productive like. Prober error handling for e.g. is missing. But feel free to play aorund, modify and copy peaces of code.
Happy streaming.
One response
Awesome article!