使用goroutine需要注意的点:
注:强烈推荐看看文末引用里关于goroutine leak的文章
下面无法控制goroutine的例子,只能靠log.Fatal调用底层的os.exit退出,但这会使整个进程退出,有些正在处理的状态会因此中断,产生不可预料的异常;
在startServer内使用goroutine,调用者main函数并不知道其使用了goroutine,实际场景下可能会遗漏管理其非阻塞的特征。
package main
import (
"log"
"net/http"
)
func main() {
// some goroutine...
startApp()
startDebug()
select {}
}
func startApp() {
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
}
func startDebug() {
go func() {
if err := http.ListenAndServe(":8081", nil); err != nil {
log.Fatal(err)
}
}()
}
package main
import (
"context"
"fmt"
"log"
"net/http"
)
func startServer(addr string, stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "GET / OK")
})
s := http.Server{
Addr: addr,
Handler: mux,
}
go func() {
<-stop
log.Printf("addr: %s shuntDown", addr)
s.Shutdown(context.Background())
}()
return s.ListenAndServe()
}
func main() {
stop := make(chan struct{})
done := make(chan error, 2)
go func() {
done <- startServer(":8080", stop) // app
}()
go func() {
done <- startServer(":8081", stop) // debug
}()
for i := 0; i < cap(done); i++ {
if err := <-done; err != nil {
close(stop)
}
}
}
package main
import (
"context"
"log"
"net/http"
"time"
)
func startServer(ctx context.Context, addr string) error {
s := http.Server{
Addr: addr,
}
go func(ctx context.Context) {
<-ctx.Done()
log.Printf("addr: %s shuntDown", addr)
s.Shutdown(ctx)
}(ctx)
log.Printf("addr: %s start", addr)
return s.ListenAndServe()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
startServer(ctx, ":8080") // app
}()
go func() {
startServer(ctx, ":8081") // debug
}()
cancel()
select {
case <-time.After(2 * time.Second):
log.Print("done")
}
}
package main
import (
"context"
"errors"
"log"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
return startServer(ctx, "3000")
})
g.Go(func() error {
return startServer(ctx, "4000")
})
g.Go(func() error {
time.Sleep(2 * time.Second)
return errors.New("it's time to exit")
})
err := g.Wait()
log.Printf("exit, err: %v\n", err)
}
func startServer(ctx context.Context, port string) error {
log.Printf("startServer: %s\n", port)
srv := http.Server{Addr: ":" + port}
go func(ctx context.Context) {
<-ctx.Done()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("HTTP server %s Shutdown error: %v", srv.Addr, err)
}
log.Printf("HTTP server %s Shutdown successfully", srv.Addr)
}(ctx)
return srv.ListenAndServe()
}
errors.New返回的是指针,能够避免error字符串相同导致错误被误以为一致。
package main
import (
"errors"
"log"
)
type myErrorString struct {
s string
}
func (e myErrorString) Error() string {
return e.s
}
// New create my error
func New(s string) error {
return myErrorString{s} // 这里返回的是字符串,而errors.New返回的是指针
}
func main() {
var err1 = errors.New("test_err")
var err2 = errors.New("test_err")
if err1 == err2 {
log.Println("err1 == err2")
}
var err3 = New("test_err")
var err4 = New("test_err")
if err3 == err4 {
log.Println("err3 == err4")
}
}
package main
import (
"log"
"errors"
)
// ErrNotFound some data is not found
var ErrNotFound = errors.New("not found")
func dao(id string) (interface{}, error) {
return nil, errors.New("not found")
}
func service(id string) (interface{}, error) {
data, err := dao(id)
if err != nil {
return nil, err
}
return data, nil
}
func api() {
id := "test"
data, err := service(id)
if err != nil {
// only print: "err: not found"
log.Printf("err: %+v\n", err)
return
}
log.Printf("found data: %+v\n", data)
}
func main() {
api()
}
package main
import (
"log"
"github.com/pkg/errors"
)
// ErrNotFound some data is not found
var ErrNotFound = errors.New("not found")
func dao(id string) (interface{}, error) {
return nil, errors.Wrapf(ErrNotFound, "id: %s is not found", id)
}
func service(id string) (interface{}, error) {
data, err := dao(id)
if err != nil {
return nil, err
}
return data, nil
}
func api() {
id := "test"
data, err := service(id)
if errors.Cause(err) == ErrNotFound {
// 打印出错误的堆栈
log.Printf("stack trace: \n%+v\n", err)
// 只打印根错误
log.Printf("original err: %+v\n", errors.Cause(err))
return
}
log.Printf("found data: %+v\n", data)
}
func main() {
api()
}
见上文中goroutine生命周期管理的【使用context管理】的代码。
context可以存储kv数据,且数据可以通过context.WithValue传递给子函数,如果子函数从当前context的kv找不到,会自动递归父级的context查找,直到找到或父级为nil。
package main
import (
"context"
"fmt"
)
type favContextKey string
var parentkey = favContextKey("parentKey")
var childKey = favContextKey("childKey")
func parent() {
ctx := context.WithValue(context.Background(), parentkey, "parentVal")
fmt.Printf("parentFn: parentVal: %+v\n", ctx.Value(parentkey))
child(context.WithValue(ctx, childKey, "childVal"))
}
func child(ctx context.Context) {
fmt.Printf("childFn: parentVal: %+v\n", ctx.Value(parentkey))
fmt.Printf("childFn: childVal: %+v\n", ctx.Value(childKey))
}
func main() {
parent()
}
一般是用依赖注入的方式,方便单元测试;但依赖也因此要显式地在代码里不断传递,一般是自行进行依赖管理,如果依赖多了就需要不少这样的代码,wire能够减少依赖管理的代理,使其更加简洁。
详情看官方文档示例,比较清晰:https://github.com/google/wire/blob/main/_tutorial/README.md
方法:锁(互斥锁、读写锁、原子操作), chan实现无(显示)阻塞的内存同步;
由于atomic使用的是copy-on-write的方法在多任务间共享内存,所以不适用于大量数据的共享;
性能:原子操作> 读写锁 >?互斥锁 >? chan;
原子操作:使用的是操作系统提供的CAS,不需要进行进程、线程或goroutine的切换;
读写锁、互斥锁:需要暂停其它goroutine,只保留相关的goroutine,涉及到goroutine的唤醒、暂停及其上下文切换的消耗;
chan:使用互斥锁实现,但其核心思想在于避免显示加锁,用通信的方式解决内存共享的多goroutine编程问题,因为锁编程的复杂度可能带来死锁、活锁、资源耗尽等风险;
go写并发的核心思想:
>
Share memory by communicating, don’t communicate by sharing memory.
package concurrency
import (
"sync"
"sync/atomic"
"testing"
)
type Config struct {
a []int
}
var readerCount = 1
func BenchmarkMutexMultipleReaders(b *testing.B) {
var lastValue uint64
var lock sync.RWMutex
cfg := Config{
a: []int{0, 0, 0, 0, 0, 0, 0},
}
var wg sync.WaitGroup
for n := 0; n < readerCount; n++ {
wg.Add(1)
go func() {
for n := 0; n < b.N; n++ {
lock.RLock()
atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
lock.RUnlock()
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkAtomicMultipleReaders(b *testing.B) {
var lastValue uint64
var v atomic.Value
cfg := Config{
a: []int{0, 0, 0, 0, 0, 0, 0},
}
v.Store(cfg)
var wg sync.WaitGroup
for n := 0; n < readerCount; n++ {
wg.Add(1)
go func() {
for n := 0; n < b.N; n++ {
d := v.Load().(Config)
atomic.SwapUint64(&lastValue, uint64(d.a[0]))
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkMutexOneWriterMultipleReaders(b *testing.B) {
var lastValue uint64
var lock sync.RWMutex
var cfg = Config{
a: []int{0, 0, 0, 0, 0, 0, 0},
}
go func() {
var i = 0
for n := 0; n < b.N; n++ {
i++
lock.Lock()
cfg = Config{
a: []int{i, i + 1, 0, 0, 0, 0, 0},
}
lock.Unlock()
}
}()
var wg sync.WaitGroup
for n := 0; n < readerCount; n++ {
wg.Add(1)
go func() {
for n := 0; n < b.N; n++ {
lock.RLock()
atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
lock.RUnlock()
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkAtomicOneWriterMultipleReaders(b *testing.B) {
var lastValue uint64
var v atomic.Value
var cfg = Config{
a: []int{0, 0, 0, 0, 0, 0, 0},
}
v.Store(cfg)
go func() {
var i = 0
for n := 0; n < b.N; n++ {
i++
cfg = Config{
a: []int{i, i + 1, 0, 0, 0, 0, 0},
}
v.Store(cfg)
}
}()
var wg sync.WaitGroup
for n := 0; n < readerCount; n++ {
wg.Add(1)
go func() {
for n := 0; n < b.N; n++ {
d := v.Load().(Config)
atomic.SwapUint64(&lastValue, uint64(d.a[0]))
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkChanOneWriterMultipleReaders(b *testing.B) {
rc := make(chan int, 1000)
b.ResetTimer()
var lastValue uint64
var cfg = Config{
a: []int{0, 0, 0, 0, 0, 0, 0},
}
go func() {
var i = 0
for msg := range rc {
i++
switch msg {
case 0:
cfg = Config{
a: []int{i, i + 1, 0, 0, 0, 0, 0},
}
case 1:
atomic.SwapUint64(&lastValue, uint64(cfg.a[0]))
}
}
}()
go func() {
for n := 0; n < b.N; n++ {
rc <- 0
}
}()
var wg sync.WaitGroup
for n := 0; n < readerCount; n++ {
wg.Add(1)
go func() {
for n := 0; n < b.N; n++ {
rc <- 1
}
wg.Done()
}()
}
wg.Wait()
}
readerCounts为4的结果:
readerCounts为1的结果:
感谢您的阅读!
如果看完后有任何疑问,欢迎拍砖。
欢迎转载,转载请注明出处:http://www.yangrunwei.com/a/116.html
邮箱:glowrypauky@gmail.com
QQ: 892413924