- Golang
golang.org/x/sync 살펴보기
golang.org/x/sync의 유용한 sync 유틸리티 소개
2020. 1. 19
golang.org/x/sync 에는 유용한 동기화 유틸리티가 있다.
- errgroup: 동기화, 에러 전파, 컨텍스트 취소를 고루틴 그룹에 제공한다.
- semaphore: 가중치 세마포어 구현을 제공한다.
- singleflight: 중복 함수 호출을 억제하는 메카니즘을 제공한다.
- syncmap: 동시성 지원하는 맵 구현체를 제공한다.
std
의sync.Map
과 같다
4개 중 이 글에서는 errgroup
, singleflight
를 소개한다.
golang.org/x/sync/errgroup
errgroup
은 동시에 실행될 여러 고루틴 중 하나라도 실패하면 전부 취소하는 경우에 쓴다.
(자바스크립트의 Promise.all
이랑 비슷하다.)
키 여러개를 가지고 해당하는 제품 정보를 모두 가져와야 할 때 errgroup
없이는 아래처럼 프로그래밍하게 된다.
func (s *Service) FetchProducts(ctx context.Context, productIDs ...string) (ress []*Product, err error) {
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup // WaitGroup 생성
ress = make([]*Product, len(productIDs))
for i, k := range productIDs {
i, k := i, k
go func() {
wg.Add(1) // 기다릴 작업 하나 추가
defer wg.Done() // 작업 완료를 defer
ress[i], err = s.db.GetProductByID(ctx, k) // 실제 작업
if err != nil { // 만약 오류 발생했다면?
cancel() // 다른 고루틴의 빠른 종료를 위해 cancel 호출
}
}
}
wg.Wait() // 고루틴이 모두 끝날 때까지 대기
if err != nil {
return nil, err
}
return ress, nil
}
errgroup
이 있으면 간단하게 할 수 있다.
import (
// ...
"golang.org/x/sync/errgroup"
)
func (s *Service) FetchProducts(ctx context.Context, productIDs ...string) (ress []*Product, err error) {
eg, ctx := errgroup.WithContext(ctx) //
ress = make([]*Product, len(productIDs))
for i, k := range productIDs {
i, k := i, k
eg.Go(func() (err error) { // eg.Go 함수로 고루틴 실행
ress[i], err = s.db.GetProductByID(ctx, k)
return err
})
}
if err = eg.Wait(); err != nil { // 대기
return nil, err
}
return ress, nil
}
errgroup.WithContext
로 생성된 컨텍스트는 eg.Wait()
이 끝났을 때 cancel()
된다는 점을 주의해야 한다.
golang.org/x/sync/singleflight
singleflight.Group
은 어떤 비동기 함수가 실행중일 때, 동일한 키로 해당 함수를 실행하면 새 요청을 보내는 대신
기존의 요청을 기다린다.
+-+ fetchProduct("123")
|
| +--+ fetchProduct("201")
| |
| | +-+ fetchProduct("123")
| | |
| | | +-+ fetchProduct("123")
| | | |
v | v v
+----------------------+ +----------------------+
key "123" | fetchProduct("123") | | fetchProduct("123") |
+----------------------+ +----------------------+
t0 | t2 t0` t3 t3`
+-----------------------+
key "201" | fetchProduct("201") |
+-----------------------+
t1 t1`
-------------------------------------------> time
아래 코드처럼 실행된다.
func TestFetchSomeProducts(t *testing.T) {
var g singleflight.Group
ctx := context.Background()
logResult := func(id string) {
t.Log(g.Do(id, func() (interface{}, error) {
return db.GetProductByID(ctx, id)
}))
}
// t0
go logResult("123")
// wait until t1
go logResult("201")
// wait until t2
go logResult("123")
// when t0`
// &Product{ID: "123"}, nil, true
// &Product{ID: "123"}, nil, true
// wait until t3
go logResult("123")
// when t1`
// &Product{ID: "201"}, nil, false
// when t3`
// &Product{ID: "123"}, nil, false
}
응답이 캐싱되지는 않는다. 키에 대한 작업을 이미 한번 처리했더라도 진행중인 요청이 없다면 새로 전송한다.