• Golang

golang.org/x/sync 살펴보기

golang.org/x/sync의 유용한 sync 유틸리티 소개

2020. 1. 19

golang.org/x/sync 에는 유용한 동기화 유틸리티가 있다.

  • errgroup: 동기화, 에러 전파, 컨텍스트 취소를 고루틴 그룹에 제공한다.
  • semaphore: 가중치 세마포어 구현을 제공한다.
  • singleflight: 중복 함수 호출을 억제하는 메카니즘을 제공한다.
  • syncmap: 동시성 지원하는 맵 구현체를 제공한다. stdsync.Map과 같다

4개 중 이 글에서는 errgroup, singleflight를 소개한다.

golang.org/x/sync/errgroup

errgroup은 동시에 실행될 여러 고루틴 중 하나라도 실패하면 전부 취소하는 경우에 쓴다. (자바스크립트의 Promise.all이랑 비슷하다.)

키 여러개를 가지고 해당하는 제품 정보를 모두 가져와야 할 때 errgroup 없이는 아래처럼 프로그래밍하게 된다.

func (s *Service) FetchProducts(ctx context.Context, productIDs ...string) (ress []*Product, err error) {
  ctx, cancel := context.WithCancel(ctx)

  var wg sync.WaitGroup // WaitGroup 생성

  ress = make([]*Product, len(productIDs))
  for i, k := range productIDs {
    i, k := i, k
    go func() {
      wg.Add(1) // 기다릴 작업 하나 추가
      defer wg.Done() // 작업 완료를 defer
      ress[i], err = s.db.GetProductByID(ctx, k) // 실제 작업
      if err != nil { // 만약 오류 발생했다면?
        cancel() // 다른 고루틴의 빠른 종료를 위해 cancel 호출
      }
    }
  }
  wg.Wait() // 고루틴이 모두 끝날 때까지 대기

  if err != nil {
    return nil, err
  }
  return ress, nil
}

errgroup이 있으면 간단하게 할 수 있다.

import (
  // ...
  "golang.org/x/sync/errgroup"
)

func (s *Service) FetchProducts(ctx context.Context, productIDs ...string) (ress []*Product, err error) {
  eg, ctx := errgroup.WithContext(ctx) //
  ress = make([]*Product, len(productIDs))
  for i, k := range productIDs {
    i, k := i, k
    eg.Go(func() (err error) { // eg.Go 함수로 고루틴 실행
      ress[i], err = s.db.GetProductByID(ctx, k)
      return err
    })
  }
  if err = eg.Wait(); err != nil { // 대기
    return nil, err
  }
  return ress, nil
}

errgroup.WithContext로 생성된 컨텍스트는 eg.Wait()이 끝났을 때 cancel()된다는 점을 주의해야 한다.

golang.org/x/sync/singleflight

singleflight.Group은 어떤 비동기 함수가 실행중일 때, 동일한 키로 해당 함수를 실행하면 새 요청을 보내는 대신 기존의 요청을 기다린다.

            +-+ fetchProduct("123")
            |
            |     +--+ fetchProduct("201")
            |     |
            |     |     +-+ fetchProduct("123")
            |     |     |
            |     |     |             +-+ fetchProduct("123")
            |     |     |             |
            v     |     v             v
            +----------------------+  +----------------------+
  key "123" |  fetchProduct("123") |  |  fetchProduct("123") |
            +----------------------+  +----------------------+
            t0    |     t2        t0`  t3                   t3`
                  +-----------------------+
  key "201"       |  fetchProduct("201")  |
                  +-----------------------+
                  t1                    t1`

            -------------------------------------------> time

아래 코드처럼 실행된다.

func TestFetchSomeProducts(t *testing.T) {
  var g singleflight.Group
  ctx := context.Background()

  logResult := func(id string) {
    t.Log(g.Do(id, func() (interface{}, error) {
      return db.GetProductByID(ctx, id)
    }))
  }

  // t0
  go logResult("123")

  // wait until t1
  go logResult("201")

  // wait until t2
  go logResult("123")

  // when t0`
  // &Product{ID: "123"}, nil, true
  // &Product{ID: "123"}, nil, true

  // wait until t3
  go logResult("123")

  // when t1`
  // &Product{ID: "201"}, nil, false

  // when t3`
  // &Product{ID: "123"}, nil, false
}

응답이 캐싱되지는 않는다. 키에 대한 작업을 이미 한번 처리했더라도 진행중인 요청이 없다면 새로 전송한다.