golang语言实践

作者:杨润炜
日期:2021/3/14 16:08

goroutine生命周期管理

使用goroutine需要注意的点:

  1. 启动goroutine后,需要有机制能控制其如何退出,避免goroutine泄漏造成系统不稳定的风险;
  2. 尽量让调用者来决定是否使用goroutine;

注:强烈推荐看看文末引用里关于goroutine leak的文章

反例

下面无法控制goroutine的例子,只能靠log.Fatal调用底层的os.exit退出,但这会使整个进程退出,有些正在处理的状态会因此中断,产生不可预料的异常;
在startServer内使用goroutine,调用者main函数并不知道其使用了goroutine,实际场景下可能会遗漏管理其非阻塞的特征。

  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. )
  6. func main() {
  7. // some goroutine...
  8. startApp()
  9. startDebug()
  10. select {}
  11. }
  12. func startApp() {
  13. go func() {
  14. if err := http.ListenAndServe(":8080", nil); err != nil {
  15. log.Fatal(err)
  16. }
  17. }()
  18. }
  19. func startDebug() {
  20. go func() {
  21. if err := http.ListenAndServe(":8081", nil); err != nil {
  22. log.Fatal(err)
  23. }
  24. }()
  25. }

使用chan管理

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. )
  8. func startServer(addr string, stop <-chan struct{}) error {
  9. mux := http.NewServeMux()
  10. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  11. fmt.Fprintf(w, "GET / OK")
  12. })
  13. s := http.Server{
  14. Addr: addr,
  15. Handler: mux,
  16. }
  17. go func() {
  18. <-stop
  19. log.Printf("addr: %s shuntDown", addr)
  20. s.Shutdown(context.Background())
  21. }()
  22. return s.ListenAndServe()
  23. }
  24. func main() {
  25. stop := make(chan struct{})
  26. done := make(chan error, 2)
  27. go func() {
  28. done <- startServer(":8080", stop) // app
  29. }()
  30. go func() {
  31. done <- startServer(":8081", stop) // debug
  32. }()
  33. for i := 0; i < cap(done); i++ {
  34. if err := <-done; err != nil {
  35. close(stop)
  36. }
  37. }
  38. }

使用context管理

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "net/http"
  6. "time"
  7. )
  8. func startServer(ctx context.Context, addr string) error {
  9. s := http.Server{
  10. Addr: addr,
  11. }
  12. go func(ctx context.Context) {
  13. <-ctx.Done()
  14. log.Printf("addr: %s shuntDown", addr)
  15. s.Shutdown(ctx)
  16. }(ctx)
  17. log.Printf("addr: %s start", addr)
  18. return s.ListenAndServe()
  19. }
  20. func main() {
  21. ctx, cancel := context.WithCancel(context.Background())
  22. go func() {
  23. startServer(ctx, ":8080") // app
  24. }()
  25. go func() {
  26. startServer(ctx, ":8081") // debug
  27. }()
  28. cancel()
  29. select {
  30. case <-time.After(2 * time.Second):
  31. log.Print("done")
  32. }
  33. }

使用errgroup管理

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "net/http"
  7. "time"
  8. "golang.org/x/sync/errgroup"
  9. )
  10. func main() {
  11. g, ctx := errgroup.WithContext(context.Background())
  12. g.Go(func() error {
  13. return startServer(ctx, "3000")
  14. })
  15. g.Go(func() error {
  16. return startServer(ctx, "4000")
  17. })
  18. g.Go(func() error {
  19. time.Sleep(2 * time.Second)
  20. return errors.New("it's time to exit")
  21. })
  22. err := g.Wait()
  23. log.Printf("exit, err: %v\n", err)
  24. }
  25. func startServer(ctx context.Context, port string) error {
  26. log.Printf("startServer: %s\n", port)
  27. srv := http.Server{Addr: ":" + port}
  28. go func(ctx context.Context) {
  29. <-ctx.Done()
  30. if err := srv.Shutdown(ctx); err != nil {
  31. log.Printf("HTTP server %s Shutdown error: %v", srv.Addr, err)
  32. }
  33. log.Printf("HTTP server %s Shutdown successfully", srv.Addr)
  34. }(ctx)
  35. return srv.ListenAndServe()
  36. }

错误处理

  1. error是一个值,而不是异常;
  2. panic意味着程序挂了,谨慎使用;
  3. errors.New返回指针类型,详情看error源码
  4. 使用errors.Wrapf跟踪错误堆栈;

errors.New返回指针的原因

errors.New返回的是指针,能够避免error字符串相同导致错误被误以为一致。

  1. package main
  2. import (
  3. "errors"
  4. "log"
  5. )
  6. type myErrorString struct {
  7. s string
  8. }
  9. func (e myErrorString) Error() string {
  10. return e.s
  11. }
  12. // New create my error
  13. func New(s string) error {
  14. return myErrorString{s} // 这里返回的是字符串,而errors.New返回的是指针
  15. }
  16. func main() {
  17. var err1 = errors.New("test_err")
  18. var err2 = errors.New("test_err")
  19. if err1 == err2 {
  20. log.Println("err1 == err2")
  21. }
  22. var err3 = New("test_err")
  23. var err4 = New("test_err")
  24. if err3 == err4 {
  25. log.Println("err3 == err4")
  26. }
  27. }

用errors.Wrapf跟踪异常的堆栈信息

使用系统内置errors

  1. package main
  2. import (
  3. "log"
  4. "errors"
  5. )
  6. // ErrNotFound some data is not found
  7. var ErrNotFound = errors.New("not found")
  8. func dao(id string) (interface{}, error) {
  9. return nil, errors.New("not found")
  10. }
  11. func service(id string) (interface{}, error) {
  12. data, err := dao(id)
  13. if err != nil {
  14. return nil, err
  15. }
  16. return data, nil
  17. }
  18. func api() {
  19. id := "test"
  20. data, err := service(id)
  21. if err != nil {
  22. // only print: "err: not found"
  23. log.Printf("err: %+v\n", err)
  24. return
  25. }
  26. log.Printf("found data: %+v\n", data)
  27. }
  28. func main() {
  29. api()
  30. }

从errors.Wrapf拿到错误堆栈信息

  1. package main
  2. import (
  3. "log"
  4. "github.com/pkg/errors"
  5. )
  6. // ErrNotFound some data is not found
  7. var ErrNotFound = errors.New("not found")
  8. func dao(id string) (interface{}, error) {
  9. return nil, errors.Wrapf(ErrNotFound, "id: %s is not found", id)
  10. }
  11. func service(id string) (interface{}, error) {
  12. data, err := dao(id)
  13. if err != nil {
  14. return nil, err
  15. }
  16. return data, nil
  17. }
  18. func api() {
  19. id := "test"
  20. data, err := service(id)
  21. if errors.Cause(err) == ErrNotFound {
  22. // 打印出错误的堆栈
  23. log.Printf("stack trace: \n%+v\n", err)
  24. // 只打印根错误
  25. log.Printf("original err: %+v\n", errors.Cause(err))
  26. return
  27. }
  28. log.Printf("found data: %+v\n", data)
  29. }
  30. func main() {
  31. api()
  32. }

context的应用

  1. 级联取消goroutine;
  2. 数据共享、传递;

级联取消goroutine;

0_1615129527189_4ec10931-66fc-4534-b4c1-06c93dc573ab-image.png
见上文中goroutine生命周期管理的【使用context管理】的代码。

数据共享、传递

context可以存储kv数据,且数据可以通过context.WithValue传递给子函数,如果子函数从当前context的kv找不到,会自动递归父级的context查找,直到找到或父级为nil。
0_1615128228780_8a198b9a-cd0f-4cbb-b8fa-ae2fea26193f-image.png

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. type favContextKey string
  7. var parentkey = favContextKey("parentKey")
  8. var childKey = favContextKey("childKey")
  9. func parent() {
  10. ctx := context.WithValue(context.Background(), parentkey, "parentVal")
  11. fmt.Printf("parentFn: parentVal: %+v\n", ctx.Value(parentkey))
  12. child(context.WithValue(ctx, childKey, "childVal"))
  13. }
  14. func child(ctx context.Context) {
  15. fmt.Printf("childFn: parentVal: %+v\n", ctx.Value(parentkey))
  16. fmt.Printf("childFn: childVal: %+v\n", ctx.Value(childKey))
  17. }
  18. func main() {
  19. parent()
  20. }

依赖管理

一般是用依赖注入的方式,方便单元测试;但依赖也因此要显式地在代码里不断传递,一般是自行进行依赖管理,如果依赖多了就需要不少这样的代码,wire能够减少依赖管理的代理,使其更加简洁。
详情看官方文档示例,比较清晰:https://github.com/google/wire/blob/main/_tutorial/README.md

内存同步的方法与性能比较

  1. 方法:锁(互斥锁、读写锁、原子操作), chan实现无(显示)阻塞的内存同步;

  2. 由于atomic使用的是copy-on-write的方法在多任务间共享内存,所以不适用于大量数据的共享;

  3. 性能:原子操作> 读写锁 >?互斥锁 >? chan;
    原子操作:使用的是操作系统提供的CAS,不需要进行进程、线程或goroutine的切换;
    读写锁、互斥锁:需要暂停其它goroutine,只保留相关的goroutine,涉及到goroutine的唤醒、暂停及其上下文切换的消耗;
    chan:使用互斥锁实现,但其核心思想在于避免显示加锁,用通信的方式解决内存共享的多goroutine编程问题,因为锁编程的复杂度可能带来死锁、活锁、资源耗尽等风险;

go写并发的核心思想:
>
Share memory by communicating, don’t communicate by sharing memory.

  1. package concurrency
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. )
  7. type Config struct {
  8. a []int
  9. }
  10. var readerCount = 1
  11. func BenchmarkMutexMultipleReaders(b *testing.B) {
  12. var lastValue uint64
  13. var lock sync.RWMutex
  14. cfg := Config{
  15. a: []int{0, 0, 0, 0, 0, 0, 0},
  16. }
  17. var wg sync.WaitGroup
  18. for n := 0; n < readerCount; n++ {
  19. wg.Add(1)
  20. go func() {
  21. for n := 0; n < b.N; n++ {
  22. lock.RLock()
  23. atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
  24. lock.RUnlock()
  25. }
  26. wg.Done()
  27. }()
  28. }
  29. wg.Wait()
  30. }
  31. func BenchmarkAtomicMultipleReaders(b *testing.B) {
  32. var lastValue uint64
  33. var v atomic.Value
  34. cfg := Config{
  35. a: []int{0, 0, 0, 0, 0, 0, 0},
  36. }
  37. v.Store(cfg)
  38. var wg sync.WaitGroup
  39. for n := 0; n < readerCount; n++ {
  40. wg.Add(1)
  41. go func() {
  42. for n := 0; n < b.N; n++ {
  43. d := v.Load().(Config)
  44. atomic.SwapUint64(&lastValue, uint64(d.a[0]))
  45. }
  46. wg.Done()
  47. }()
  48. }
  49. wg.Wait()
  50. }
  51. func BenchmarkMutexOneWriterMultipleReaders(b *testing.B) {
  52. var lastValue uint64
  53. var lock sync.RWMutex
  54. var cfg = Config{
  55. a: []int{0, 0, 0, 0, 0, 0, 0},
  56. }
  57. go func() {
  58. var i = 0
  59. for n := 0; n < b.N; n++ {
  60. i++
  61. lock.Lock()
  62. cfg = Config{
  63. a: []int{i, i + 1, 0, 0, 0, 0, 0},
  64. }
  65. lock.Unlock()
  66. }
  67. }()
  68. var wg sync.WaitGroup
  69. for n := 0; n < readerCount; n++ {
  70. wg.Add(1)
  71. go func() {
  72. for n := 0; n < b.N; n++ {
  73. lock.RLock()
  74. atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
  75. lock.RUnlock()
  76. }
  77. wg.Done()
  78. }()
  79. }
  80. wg.Wait()
  81. }
  82. func BenchmarkAtomicOneWriterMultipleReaders(b *testing.B) {
  83. var lastValue uint64
  84. var v atomic.Value
  85. var cfg = Config{
  86. a: []int{0, 0, 0, 0, 0, 0, 0},
  87. }
  88. v.Store(cfg)
  89. go func() {
  90. var i = 0
  91. for n := 0; n < b.N; n++ {
  92. i++
  93. cfg = Config{
  94. a: []int{i, i + 1, 0, 0, 0, 0, 0},
  95. }
  96. v.Store(cfg)
  97. }
  98. }()
  99. var wg sync.WaitGroup
  100. for n := 0; n < readerCount; n++ {
  101. wg.Add(1)
  102. go func() {
  103. for n := 0; n < b.N; n++ {
  104. d := v.Load().(Config)
  105. atomic.SwapUint64(&lastValue, uint64(d.a[0]))
  106. }
  107. wg.Done()
  108. }()
  109. }
  110. wg.Wait()
  111. }
  112. func BenchmarkChanOneWriterMultipleReaders(b *testing.B) {
  113. rc := make(chan int, 1000)
  114. b.ResetTimer()
  115. var lastValue uint64
  116. var cfg = Config{
  117. a: []int{0, 0, 0, 0, 0, 0, 0},
  118. }
  119. go func() {
  120. var i = 0
  121. for msg := range rc {
  122. i++
  123. switch msg {
  124. case 0:
  125. cfg = Config{
  126. a: []int{i, i + 1, 0, 0, 0, 0, 0},
  127. }
  128. case 1:
  129. atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
  130. }
  131. }
  132. }()
  133. go func() {
  134. for n := 0; n < b.N; n++ {
  135. rc <- 0
  136. }
  137. }()
  138. var wg sync.WaitGroup
  139. for n := 0; n < readerCount; n++ {
  140. wg.Add(1)
  141. go func() {
  142. for n := 0; n < b.N; n++ {
  143. rc <- 1
  144. }
  145. wg.Done()
  146. }()
  147. }
  148. wg.Wait()
  149. }

readerCounts为4的结果:

readerCounts为1的结果:

Reference

goroutine-leak
atomic vs mutex
go-trace

感谢您的阅读!
如果看完后有任何疑问,欢迎拍砖。
欢迎转载,转载请注明出处:http://www.yangrunwei.com/a/116.html
邮箱:glowrypauky@gmail.com
QQ: 892413924