[go] 내가 만든 kafka-connector to kinesis, 통칭 doka-connector
Golang으로 공부를 하던찰나, 이제 드디어 공부하던것이 빛을 발할때가 왔다!
통칭 doka-connector, dobby-kafka-connector이다

왜 doka를 만들었을까?
데이터팀에서 최근 개발팀에 MSK에 특정 토픽에 메시지를 만들어 달라고 요청을 했다. 그리고 해당 메시지를 S3에 적재하고 싶어했다.
처음에는 MSK에 메시지를 쌓으면 데이터팀 S3에 적재할 수 있겠다. 쉽겠다.라고 생각했는데 여기에는 생각치 못한 문제가 있었다.
- 개발/데이터 Account가 분리되어 있다. 그리고 Account끼리 VPC 피어링/TransitGateway 연결이 안되어 있다.
- MSK는 Private Subnet에 배포되어 있어 CrossAccount에서 접근할 수가 없다.
그래서 MSK의 메시지를 AWS 다른 Account Kinesis에 메시지를 적재할 수 있는 kafka proxy가 필요했다!!
처음에는 Confluent kafka-connector/AWS 에서 제공해주는 kafka-connector를 사용하려고 했었는데, 회사의 니즈랑은 맞지 않았다. 온전히 사용할 수 없고 결국에는 수정이 필요할것 같아서, 직접 구현하기로 마음먹었다!!
- 단순히 MSK 메시지를 토픽만으로 끝나는게 아니라, 토픽을 구독하고 다른 Account Role Assume을 하고 Kinesis Stream에 Put을 해야했다!
어떻게 구성되어 있을까?
전체적인 흐름은 아래 그림이다. 기본적으로 현재 사용중인 Cluster 위에 배포하는걸 기본으로 했다.
kafka-connector는 기본적으로 A 계정의 EKS에 배포되어 있다.
- EKS에 배포했기 때문에 ServiceAccount을 사용하여 A 계정의 Role을 할당받는다.
- 그리고 A 계정의 Role에는 B 계정의 특정 Role을 Assume 할 수 없는 권한을 추가하고, B 계정에서 신뢰관계를 추가한다.
- 그리고 B 계정의 역할에서 Kinesis 권한을 추가하면 된다.
그리고 A 계정의 MSK의 특정 토픽을 구독하고, 메시지를 받아서 B 계정의 특정 Kinesis로 Put을 하면 된다.
어떻게 동작할까?
중요한 로직은 msk 메시지를 consume하고, kinesis produce를 하고, msk commit을 하는 것이다.
핵심은 msk 메시지를 consume을 하는것이고, 여러가지 패키지를 검색해보면, 결과 Shopify sarama 패키지를 사용하기로 했다.
- sarama를 사용한 이유는 거창하지 않다. 가장 많이 검색이 되었기 때문이였다ㅋㅋㅋ
그리고 각 환경별로 서로 다른 Role을 사용하기 때문에 Viper 패키지로 Config Profile을 사용하기로 했다.
외부 요청을 받을 필요가 없기 때문에 웹 프레임워크는 사용하지 않았다.
MSK Consumer
Medium 글을 많이 참고했다. 이미 비슷한 고민을 하는 사람들이 구현하는 방법을 많이 소개하고 있다.
그래서 기본 틀을 가져오고, 기본 틀에서 회사의 니즈를 일부 넣어서 구현했다. Sarama를 사용하기 위해서 기본적으로 Setup, Cleanup, ConsumeClaim 메서드를 사용하기 때문에 이름을 변경하면 동작하지 않는다.
그리고 중요한건 ConsumeClaim에서 내부적으로 Goroutine을 사용하기 때문에 내가 별도로 구현하지 않아도 된다는 편의가 있다!!
MSK Consumer에서 중요한것은 메시지를 읽었으면, 읽었다고 Mark(MSK Commit)을 하는것이다.
- 처음에는 MarkMessage가 없었는데 이게 없으니 MSK Lag이 줄어들지 않아서 당황했다;;
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { return nil }
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }
// MSK 토픽 메시지를 소비하고, Kinesis PutRecord
// sarama에서 제공해주는 ConsumerClaim에서 내부적으로 goroutine을 사용함
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for messege := range claim.Messages() {
messageValues := string(messege.Value)
// FIXME: kinesis putrecord 하는 부분을 별도의 goroutine으로 분리
// Kienesis PutRecord 로직
c.recordMessage(messageValues)
session.MarkMessage(messege, "")
}
return nil
}
// MSK 토픽에 Consumer Group 생성
func (c Consumer) CreateConsumer() (sarama.ConsumerGroup, string) {
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
client, err := sarama.NewClient(c.conf.GetStringSlice("msk.brokers"), config)
if err != nil {
loggers.GlobalLogger.Fatal("fail to create msk topic consume:", err)
}
consumerGroup, err := sarama.NewConsumerGroupFromClient(c.conf.GetString("msk.groupId"), client)
if err != nil {
loggers.GlobalLogger.Fatal("can not create msk consumergroup:", err)
}
return consumerGroup, c.conf.GetString("msk.topic")
}
// Streamer Layer의 PutStreamData 함수를 호출/MSK 메시지를 Kinesis Put
func (c Consumer) recordMessage(message string) {
c.streamer.PutStreamData(message)
}
CrossAccount Assume
CrossAccount Role Assume을 하기 위해서는 현재 배포되어 있는 Pod의 ServiceAccount에 대해 먼저 Client를 만들어야 한다.
- getOriginalSession
그리고 CrossAccount Assume을 하기 위해 Sts 임시 토큰을 받고, Kinesis Client를 생성했다.
이것만 본다면 크게 어려운 부분이 없다. 그리고 실제로도 Assume, STS, Kinesis Client를 만드는것은 예시코드가 아주 많다ㅋㅋ
그중 일부를 가져오고, 입맛에 맞춰 사용하면 되는것이다.
// A Account의 Service Account Assume
func getOriginalSession() (aws.Config, error) {
session, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-northeast-2"))
if err != nil {
return session, err
}
return session, nil
}
// A Account Role을 사용해서 B Account Role STS Client 생성
func createCrossSession(session aws.Config) aws.CredentialsProviderFunc {
conf := configs.GetConfig()
stsClient := sts.NewFromConfig(session)
roleSessionName := fmt.Sprintf("doka-%s-%s", viper.GetString("ENV"), uuid.NewString())
stsInput := &sts.AssumeRoleInput{
RoleArn: aws.String(conf.GetString("aws.crossAccountArn")),
RoleSessionName: aws.String(roleSessionName),
}
stsOutput, err := stsClient.AssumeRole(context.TODO(), stsInput)
if err != nil {
loggers.GlobalLogger.Fatal(err)
}
//
crossAccountSts := aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
return aws.Credentials{
AccessKeyID: *stsOutput.Credentials.AccessKeyId,
SecretAccessKey: *stsOutput.Credentials.SecretAccessKey,
SessionToken: *stsOutput.Credentials.SessionToken,
Source: conf.GetString("aws.crossAccountArn"),
}, nil
})
return crossAccountSts
}
// B Account Role을 사용해서 Kinesis Client 생성
func createCrossKinesisSession(crossAccountSts aws.CredentialsProviderFunc) (*kinesis.Client, error) {
conf := configs.GetConfig()
kinesisConfig := aws.Config{
Region: conf.GetString("aws.region"),
Credentials: aws.NewCredentialsCache(crossAccountSts),
}
kinesisClient := kinesis.NewFromConfig(kinesisConfig)
return kinesisClient, nil
}
Kinesis Produce
그리고 가장 마지막으로 CrossAccount STS로 생성한 Kinesis Client로 PutRecord를 하면 된다!
// Kinesis PutRecord
func (s *Streamer) PutStreamData(message string) error {
streamName := s.conf.GetString("aws.kinesis.streamName")
partitionKey := fmt.Sprintf("%s-%s", s.conf.GetString("aws.kinesis.partitionKey"), uuid.NewString())
input := &kinesis.PutRecordInput{
Data: []byte(message),
StreamName: &streamName,
PartitionKey: aws.String(partitionKey),
}
_, err := s.kinesisClient.PutRecord(context.TODO(), input)
if err != nil {
// TODO: 에러 메트릭 생산 및 수집해서 모니터링 강화
loggers.GlobalLogger.Printf("fail to put record: %v", err)
}
return nil
}
배포는 어떻게 할까?
k8s 배포
배포에 필요한 것도 아주 간단한다. MSK에 접근하고 Assume -> Kinesis Put만 하면 되기 때문에 Deployment 그리고 ServiceAccount을 제외하면 다른 Kind(Service, Ingress)는 배포할 필요가 없다.
아래 예시코드에서 Image, ServiceAccount Annotation만 변경하면 된다.
- 필자는 ECR에 doka 이미지를 업로드 했기 때문에 Image를 ECR Path로 했다.
apiVersion: apps/v1
kind: Deployment
metadata:
name: doka-connector
namespace: doka-connector
spec:
replicas: 1
selector:
matchLabels:
app: doka-connector
template:
metadata:
labels:
app: doka-connector
spec:
containers:
- name: kafka-connector
# AWS Account ID를 각자에 맞춰서 업데이트하면 됩니다.
image: xxxxxxx.dkr.ecr.ap-northeast-2.amazonaws.com/doka-connector:latest
imagePullPolicy: Always
ports:
- containerPort: 8083
serviceAccount: doka-connector
serviceAccountName: doka-connector
---
apiVersion: v1
kind: ServiceAccount
metadata:
annotations:
# AWS Account ID를 각자에 맞춰서 업데이트하면 됩니다.
eks.amazonaws.com/role-arn: arn:aws:iam::xxxxxxx:role/doka-connector
name: doka-connector
namespace: doka-connector
최종적으로는 어떻게 될까?
최종적으로는 아래와 같이 MSK 토픽을 보면 Consumer Group에 잘 등록되어 있고, 메시지가 들어올 때 Lag이 올라가고/메시지가 소비되면 Lag이 잘 줄어든다. 야호~
그리고 Kinesis에도 보면 PutRecord 지표도 잘 올라가는것 같다~
개발에 소요된 시간은 한 1주일정도 걸린것 같다. 이거에만 집중하면 훨씬 더 시간이 적게 걸리겠지만 다른 업무도 진행하고/팀원분들 업무 리뷰도 해야하고.. 실질적으로 개발에 할애한 시간이 많지 않았던거 같다ㅜㅜ
다른 회사의 Devops팀분들이랑도 이야기해보면, 플랫폼 엔지니어링/내부 플랫폼 개발을 하는 조직이 있기는 한대 실제로는 개발팀 요청사항/인프라 유지관리/EKS 유지관리 등으로 개발에 할애하는 시간은 많지 않다고 한다.
뭔가 공부를 하고, 실제 필요한 것을 만들고/사용했다는 것에 성취감이 있어서 좋았다.
전체 코드는 아래 Github에 모두 공개해놨다. 아직 부족한 실력이지만 분명 더 좋은 방법이 있을것이고/더 개선할 수 있는 여지는 분명하다.
업무에 필요한 부분이 있으면 자유롭게 사용하시면 됩니다~ 그리고 개선할 수 있는 것은 PR로 올려주시면 감사합니다: ) bb
GitHub - dongjin-park92/doka-connector: MSK message consume and produce crossaccount kinesis steam
MSK message consume and produce crossaccount kinesis steam - GitHub - dongjin-park92/doka-connector: MSK message consume and produce crossaccount kinesis steam
github.com
앞으로도 개발을 하고 공부를 하고 조금씩 성장해보자!!

끝