Introduction

In this post, we will program an application in Golang that allows us to stream data in “real-time” from an S3 bucket using gRPC. To do this we will:

  • Create an S3 bucket
  • Generate test data with Golang and store it in our bucket
  • Define protocol buffer and gRPC
  • Implement gRPC streaming in our server
  • Test our server implementing a simple test client

If half of the terms here don’t mean anything to you, no problem, we clarify everything in this article. If you want to learn more about these topics yourself, you can find good references here:

The repository that contains the final code can be found here: https://github.com/AICDEV/s3_streaming_with_golang

The Project

Data

Now that we know what we want to build, we still need data. I thought we would do this on a simple example with transaction data. So the account transactions of a user. An example data set could look like this:

User:
 - email
 - ID 
 - []Transaction

 Transaction
  - amount
  - currency
  - recipient
  - reason
  - verification_key

For our S3 storage this means that we have a folder “transactions” in which we create another folder for each user-id. In this folder we store serialized transaction (500 pieces per file). It looks like this:

<bucket>/<user-id>/transactions

For our application this means that we have a gRPC function “GetUserTransactions” which expects the user-id as parameter, then loads the data from S3 and in a go routine sends the data as stream to the client.

Protocol Buffer and gRPC definition

A proposal for a protocol buffer definition:

syntax = "proto3";

option go_package = "github.com/aicdev/s3_streaming_with_golang";

package transactions;

message User {
    string email = 1;
    string id = 2;
    repeated Transaction transactions = 3;
}

message TransactionCollection {
    repeated Transaction transactions = 1;
}

message Transaction {
    double anmount = 1;
    enum Currency {
        EUR = 0;
        USD = 1;
    }

    Currency currency = 2;
    string recipient = 3;
    string reason = 4;
    string verificationKey = 5;
    string userId = 6;
}

service TransactionService {
    rpc GetUserTransactions(User) returns (stream Transaction) {}
}

Now that we have defined our protocol buffer and our service (rpc), we still need to compile our definition to golang. For this we first create our go project. Please make sure that you have installed the following dependencies (if you run in trouble with golang and protocol buffer, check: https://grpc.io/docs/languages/go/quickstart/):

go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc
go get -u google.golang.org/grpc 

Next step is to create our build script. Therefore we create the file “build_golang_proto.sh” and a folder “proto” inside our project folder and paste the following content into the build_golang_proto.sh file:

protoc --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative transaction.proto -I . -I .

Then run the script and watch the output inside the ./proto folder. I recommend to look in the output. This image here is only a short preview:

After that my project folder looks like this:

AWS S3 Bucket

In AWS we need 2 things:

  • A user or API credentials to programmatically connect us to S3.
  • A bucket

For the first topic just follow the offical docs from aws: https://docs.aws.amazon.com/powershell/latest/userguide/pstools-appendix-sign-up.html

For the bucket just search for S3 in the aws service catalog and click on it:

Then click on the big orange button “create bucket” and enter your bucket options. My bucket looks like this:

And after creation you should see something like this:

Demo data creation

Time for the first lines of code. First of all, let’s start by creating a few thousand test transactions. But before we start: We want to control our program via command lines arguments to set a mode.

--mode=test = test data generation
--mode=stream = streaming from s3

to do this we still need to install the following go modules: https://github.com/alecthomas/kingpin

go get gopkg.in/alecthomas/kingpin.v2

My test data generator looks like this:

package data

import (
	"crypto/sha512"
	"encoding/hex"
	"fmt"
	"math/rand"
	"time"

	pb "github.com/aicdev/s3_streaming_with_golang/proto"
)

const letters = "abcdefghijklmnopqrstuvwxyz"

type TestDataServiceInterface interface {
	CreateTestData(int)
	GetTestData() []*pb.User
}

type testDataService struct {
	testData []*pb.User
}

func NewTestDataService() TestDataServiceInterface {
	rand.Seed(time.Now().Unix())
	return &testDataService{}
}

func (tsd *testDataService) CreateTestData(limit int) {
	for i := 0; i < limit; i++ {
		email := fmt.Sprintf("%[email protected]%s.%s", getRandomStringBytes(8), getRandomStringBytes(4), getRandomStringBytes(2))
		testUser := &pb.User{
			Email: email,
			Id:    GetHash(email),
		}

		tsd.createTestTransactions(testUser)
		tsd.testData = append(tsd.testData, testUser)
	}
}

func (tsd *testDataService) GetTestData() []*pb.User {
	return tsd.testData
}

func (tsd *testDataService) createTestTransactions(testUser *pb.User) {
	for i := 0; i < 5000; i++ {
		testUser.Transactions = append(testUser.Transactions, &pb.Transaction{
			Anmount:         rand.Float64(),
			Recipient:       fmt.Sprintf("%[email protected]%s.%s", getRandomStringBytes(8), getRandomStringBytes(4), getRandomStringBytes(2)),
			Reason:          getRandomStringBytes(20),
			VerificationKey: fmt.Sprintf("%s-%s", getRandomStringBytes(4), getRandomStringBytes(8)),
			UserId:          testUser.GetId(),
			Currency:        pb.Transaction_USD,
		})
	}
}

// inspired from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go
func getRandomStringBytes(limit int) string {
	b := make([]byte, limit)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}

func GetHash(t string) string {
	sum := sha512.Sum512([]byte(t))
	return hex.EncodeToString(sum[:])
}

Let’s test if we get some garbage data. Therefore we need to create a new TestDataService and call the generation:

package app

import (
	"fmt"
	"log"
	"os"

	"github.com/aicdev/s3_streaming_with_golang/data"
	"gopkg.in/alecthomas/kingpin.v2"
)

var (
	mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)

func StartApplication() {
	kingpin.Parse()
	log.Printf("starting streaming service in mode: %s", *mode)
	if *mode == "test" {
		runTestDataCreation()
	}

}

func runTestDataCreation() {

	testDataCreator := data.NewTestDataService()
	testDataCreator.CreateTestData(5)

	for _, v := range testDataCreator.GetTestData() {
		fmt.Println(v.GetTransactions())
	}
	os.Exit(0)
}


Then start the script by running:

go run main.go --mode=test    

And we get some nice garbage output:

Now that we have our test data we still need to store it in AWS S3.

Save data in AWS S3

One important ressource to read is the aws sdk documentation: https://aws.amazon.com/sdk-for-go/?nc1=h_ls and the getting started guide where the installation is documented: https://aws.github.io/aws-sdk-go-v2/docs/getting-started/. Notice that my example is not based on the api version 2. But it should work.

To install in short run:

go get github.com/aws/aws-sdk-go/aws
# if there are any weird behaviours run 
# go mod tidy

And to parse environment variables in a nice way install:

go get github.com/kelseyhightower/envconfig

My AWS session handler looks like this:

package cloud

import (
	"log"

	"github.com/aicdev/s3_streaming_with_golang/env"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
)

type AwsServiceInterface interface {
	GetSession() *session.Session
}

type awsService struct {
	Session *session.Session
}

var (
	AwsService AwsServiceInterface = &awsService{}
)

func (as *awsService) GetSession() *session.Session {
	if as.Session != nil {
		return as.Session
	}

	parsedEnv, _ := env.ParseEnvironmentVariables()

	sess, err := session.NewSession(&aws.Config{
		Region:      &parsedEnv.S3Region,
		Credentials: credentials.NewEnvCredentials(),
	})

	if err != nil {
		log.Fatal(err)
	}

	as.Session = sess

	return as.Session
}

And the environment variable parser:

package env

import (
	"github.com/kelseyhightower/envconfig"
)

type Streamingnvironment struct {
	Port         string `required:"true" default:"9999"`
	S3Region     string `required:"true" default:"eu-central-1"`
	S3BucketName string `required:"true"`
}

func ParseEnvironmentVariables() (*Streamingnvironment, error) {
	env := &Streamingnvironment{}
	if err := envconfig.Process("streaming_example", env); err != nil {
		return env, err
	}

	return env, nil
}

The next important step is that when we start our application we check if all required environment variables are set. For this we use the init method in our app.go. Therefore we at that little peace of code into the init function:

package app

import (
	"fmt"
	"log"
	"os"

	"github.com/aicdev/s3_streaming_with_golang/data"
	"github.com/aicdev/s3_streaming_with_golang/env"
	"gopkg.in/alecthomas/kingpin.v2"
)

var (
	mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)

func init() {
	_, err := env.ParseEnvironmentVariables()
	if err != nil {
		log.Fatal(err.Error())
	}
}

func StartApplication() {
	kingpin.Parse()
	log.Printf("starting streaming service in mode: %s", *mode)
	if *mode == "test" {
		runTestDataCreation()
	}

}

func runTestDataCreation() {

	testDataCreator := data.NewTestDataService()
	testDataCreator.CreateTestData(5)

	for _, v := range testDataCreator.GetTestData() {
		fmt.Println(v.GetTransactions())
	}
	os.Exit(0)
}

Next step is to implement our uploader. Notice that the uploader has some “magic” inside. In order to stream thousands of transaction, we gonna “chunk” the transactions with a maximum size of 500. That means, serialize every bunch of 500 transaction and store them into the users folder in S3. That means, instead of downloading maybe 50MB of user data and send them back to client, we can start streaming every part after it’s get downloaded.

package uploader

import (
	"bytes"
	"fmt"

	"log"

	cloud "github.com/aicdev/s3_streaming_with_golang/aws"
	"github.com/aicdev/s3_streaming_with_golang/env"
	pb "github.com/aicdev/s3_streaming_with_golang/proto"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"google.golang.org/protobuf/proto"
)

type UploaderServiceInterface interface {
	UploadTestData(*pb.User)
}

type uploaderService struct {
	collectionChunkSize int
}

func NewUploaderService() UploaderServiceInterface {
	return &uploaderService{
		collectionChunkSize: 500,
	}
}

func (ups *uploaderService) UploadTestData(user *pb.User) {
	for i := 0; i < cap(user.GetTransactions()); i += ups.collectionChunkSize {
		if (i + int(ups.collectionChunkSize)) > cap(user.GetTransactions())-1 {
			fragments := user.GetTransactions()[i : cap(user.GetTransactions())-1]

			tc := &pb.TransactionCollection{
				Transactions: fragments,
			}

			raw, _ := proto.Marshal(tc)
			ups.upload(user.GetId(), fmt.Sprintf("transaction-%d", i), raw)
		} else {
			fragments := user.GetTransactions()[i:cap(user.GetTransactions())]
			tc := &pb.TransactionCollection{
				Transactions: fragments,
			}

			raw, _ := proto.Marshal(tc)
			ups.upload(user.GetId(), fmt.Sprintf("transaction-%d", i), raw)
		}
	}
}

func (ups *uploaderService) upload(userId string, key string, raw []byte) {
	parsedEnv, _ := env.ParseEnvironmentVariables()

	session := cloud.AwsService.GetSession()
	up := s3manager.NewUploader(session)

	_, err := up.Upload(&s3manager.UploadInput{
		Bucket: aws.String(parsedEnv.S3BucketName),
		Key:    aws.String(fmt.Sprintf("%s/%s", userId, key)),
		Body:   bytes.NewReader(raw),
	})

	if err != nil {
		log.Println(err.Error())
	}
}

And call the uploader from our app.go


package app

import (
	"log"
	"os"

	"github.com/aicdev/s3_streaming_with_golang/data"
	"github.com/aicdev/s3_streaming_with_golang/env"
	"github.com/aicdev/s3_streaming_with_golang/uploader"
	"gopkg.in/alecthomas/kingpin.v2"
)

var (
	mode = kingpin.Flag("mode", "mode to start application").Default("stream").String()
)

func init() {
	_, err := env.ParseEnvironmentVariables()
	if err != nil {
		log.Fatal(err.Error())
	}
}

func StartApplication() {
	kingpin.Parse()
	log.Printf("starting streaming service in mode: %s", *mode)
	switch *mode {
	case "test":
		runTestDataCreation()

	case "stream":
		bootRPCService()

	default:
		log.Fatalf("unknown mode: %s", *mode)
	}
}

func runTestDataCreation() {

	uploader := uploader.NewUploaderService()

	testDataCreator := data.NewTestDataService()
	testDataCreator.CreateTestData(1)

	for _, v := range testDataCreator.GetTestData() {
		uploader.UploadTestData(v)
	}
	os.Exit(0)
}

func bootRPCService() {

}
os.Exit(0)
}

Let’s run our application by calling (make sure that your have your AWS access credentials set as environment variables):

go run main.go --mode=test

The result in your bucket should look like this:

Summary

Let’s recap what we have done so far.

  • We have created a protocol buffer definition with an rpc service definition and compiled to go.
  • We have created an S3 bucket in AWS
  • We have implemented test date creation mode and can store for each user his transaction data as serialized information (bytes) for each user in S3

Stream data from AWS S3

Now that we have test data, we can implement our RPC server and send the data from AWS as a stream to the client. Therefore we need a downloader that is able to download entries in a user folder and a streaming service that deserialized the downloader response and push the result to the client stream.

My download looks like this:

package downloader

import (
	cloud "github.com/aicdev/s3_streaming_with_golang/aws"
	"github.com/aicdev/s3_streaming_with_golang/env"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

type DownloaderServiceInterface interface {
	DownloadFromS3(string) ([]byte, error)
}

type downloadService struct {
	env *env.Streamingnvironment
}

func NewDownloaderService() DownloaderServiceInterface {
	parsedEnv, _ := env.ParseEnvironmentVariables()
	return &downloadService{
		env: parsedEnv,
	}
}

func (dw *downloadService) DownloadFromS3(key string) ([]byte, error) {

	awsSession := cloud.AwsService.GetSession()
	dwm := s3manager.NewDownloader(awsSession, func(d *s3manager.Downloader) {
		d.Concurrency = 10
	})

	raw := &aws.WriteAtBuffer{}
	_, err := dwm.Download(raw, &s3.GetObjectInput{
		Bucket: aws.String(dw.env.S3BucketName),
		Key:    aws.String(key),
	})

	if err != nil {
		return nil, err
	}

	return raw.Bytes(), nil
}

And my streaming service:

package services

import (
	"log"

	cloud "github.com/aicdev/s3_streaming_with_golang/aws"
	"github.com/aicdev/s3_streaming_with_golang/downloader"
	"github.com/aicdev/s3_streaming_with_golang/env"
	pb "github.com/aicdev/s3_streaming_with_golang/proto"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3"
	"google.golang.org/protobuf/proto"
)

type StreamingServiceInterface interface {
	Stream(string, chan *pb.TransactionCollection)
}

type streamingService struct{}

var (
	StreamingService StreamingServiceInterface = &streamingService{}
)

func (ss *streamingService) Stream(id string, c chan *pb.TransactionCollection) {
	parsedEnv, _ := env.ParseEnvironmentVariables()
	sc := cloud.AwsService.GetSession()

	bucket_client := s3.New(sc)

	var continueToken *string

	for {
		resp, _ := bucket_client.ListObjectsV2(&s3.ListObjectsV2Input{
			Bucket:            aws.String(parsedEnv.S3BucketName),
			Prefix:            aws.String(id),
			ContinuationToken: continueToken,
		})

		downloader := downloader.NewDownloaderService()
		for _, key := range resp.Contents {

			rawBytes, err := downloader.DownloadFromS3(*key.Key)

			if err != nil {
				log.Fatal(err)
			}

			transactionCollection := &pb.TransactionCollection{}
			err = proto.Unmarshal(rawBytes, transactionCollection)

			if err != nil {
				log.Fatal(err)
			}

			if len(transactionCollection.GetTransactions()) > 0 {
				c <- transactionCollection
			}
		}

		if !aws.BoolValue(resp.IsTruncated) {
			break
		}

		continueToken = resp.NextContinuationToken
	}

	close(c)
}

Implement a simple test client

Now it’s time to test our implementation. Therefore we create a simple rpc client:

package main

import (
	"context"
	"io"
	"log"
	"time"

	pb "github.com/aicdev/s3_streaming_with_golang/proto"
	"google.golang.org/grpc"
)

func main() {
	con, err := grpc.Dial("localhost:9999", grpc.WithInsecure())

	if err != nil {
		log.Fatalf("connection error: %v", err)
	}

	defer con.Close()

	c := pb.NewTransactionServiceClient(con)

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	stream, err := c.GetUserTransactions(ctx, &pb.User{
		Id: "acb03ff98e576ec4b5437d791d85d98c523821ecb61457668928f6b4c204ab944f5be2ca3605359d3a41aceb4946151a56fa16fdc09161e8a65129c19b4da83b",
	})

	if err != nil {
		log.Fatalf("unable to get data: %v", err)
	}

	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Fatalf("cannot receive %v", err)
		}
		log.Printf("Resp received: %s", resp)
	}
}

Call the client:

go run client.go

And watch the output stream:

Summary

I hope this post has shown one or the other how to work with gRPC, AWS S3 and golang. Please keep in mind that this are only fragments from code and this has not the claim to be productive like. Prober error handling for e.g. is missing. But feel free to play aorund, modify and copy peaces of code.

Happy streaming.

Categories:

Tags:

One response

Leave a Reply

Your email address will not be published. Required fields are marked *