併發控制
Goroutine & Channel
CSP(communicating sequential processes)
Do not communicate by sharing memory; instead, share memory by communicating
sync.Once 的使用場景
常應用於單例模式,例如初始化配置、保持資料庫連線等。作用與init
函數類似,但有差別。
- init 函數是當所在的 package 首次被載入時執行,若遲遲未被使用,則既浪費了內存,又延長了程式載入時間。
- sync.Once 可以在程式碼的任意位置初始化和調用,因此可以延遲到使用時再執行,並發場景下是線程安全的。
在多數情況下,sync.Once
被用來控制變數的初始化,這個變數的讀寫滿足以下三個條件:
- 當且僅當第一次存取某個變數時,進行初始化
- 變數初始化過程中,所有讀取都被阻塞,直到初始化完成
- 變數僅初始化一次,初始化完成後駐留在記憶體。
sync.Once
僅提供了一個方法Do
,參數 f
是物件初始化函數。
func (o *Once) Do (f func () )
singleflight 的使用場景
將併發呼叫合併成一個呼叫的特性非常適合用來防止緩存擊穿
其他例子
情境 1
在多執行緒下共享變數
方法 1:sync.Mutex
package main
import (
"fmt"
"sync"
"time"
)
type SafeNumber struct {
v int
mux sync.Mutex
}
func main() {
total := SafeNumber{v: 0}
for i := 0; i < 1000; i++ {
go func() {
total.mux.Lock()
total.v++
total.mux.Unlock()
}()
}
time.Sleep(time.Second)
total.mux.Lock()
fmt.Println(total.v)
total.mux.Unlock()
}
方法 2:Channel
package main
import (
"fmt"
"time"
)
func main() {
total := 0
ch := make(chan int, 1)
ch <- total
for i := 0; i < 1000; i++ {
go func() {
ch <- <-ch + 1
}()
}
time.Sleep(time.Second)
fmt.Println(<-ch)
}
情境 2
多執行緒下,經常需要處理的是執行緒之間的狀態管理,其中一個經常發生的事情是等待,例如 A 執行緒需要等 B 執行緒計算並取得資料後才能繼續往下執行
方法 1:sync.WaitGroup
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func work(wg *sync.WaitGroup) {
defer wg.Done()
n := rand.Intn(20)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("Done in %ds\n", n)
}
func main() {
wg := &sync.WaitGroup{}
wg.Add(2)
go work(wg)
go work(wg)
wg.Wait()
fmt.Println("Finished!")
}
方法 2:Channel
package main
import (
"fmt"
"math/rand"
"time"
)
func work(c chan string) {
time.Sleep(time.Duration(rand.Intn(20)) * time.Second)
fmt.Printf("Done in %ds\n", n)
c <- "DONE"
}
func main() {
ch := make(chan string)
go work(ch)
go work(ch)
<-ch
<-ch
fmt.Println("Finished!")
}
情境 3
需要同時處理 5 個背景工作,且需要收到這 5 個 Job 處理的結果,並顯示在畫面上,如果其中一個 Job 失敗,就跳出 main 函式,而這 5 個 JOB 預期在 10 秒內結束,如果超過 10 秒,也是一樣跳出 main 函式
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
func work(outChan chan int, errChan chan error, val int, wg *sync.WaitGroup) {
defer wg.Done()
t := rand.Int31n(20)
time.Sleep(time.Duration(t) * time.Second)
if t > 5 {
fmt.Println("finished job id:", val)
outChan <- val
} else {
errChan <- errors.New(fmt.Sprintf("failed job id: %d", val))
}
}
func main() {
outChan := make(chan int)
errChan := make(chan error)
finishChan := make(chan struct{})
wg := &sync.WaitGroup{}
sum := 0
wg.Add(5)
for i := 0; i < 5; i++ {
go work(outChan, errChan, i, wg)
}
go func() {
wg.Wait()
close(finishChan)
}()
timeout := time.After(10 * time.Second)
LOOP:
for {
select {
case val := <-outChan:
sum += val
case err := <-errChan:
fmt.Println("error:", err)
break LOOP
case <-finishChan:
fmt.Println("finish all jobs")
break LOOP
case <-timeout:
fmt.Println("timeout")
break LOOP
}
}
fmt.Println("sum:", sum)
}
情境 4
控制併發的數量
package main
import (
"errors"
"fmt"
"sync"
)
func search(word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結果
}
func batch_search(words []string) ([]string, error) {
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
tokens = make(chan struct{}, 10)
err error
)
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := batch_search(words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
情境 5
goroutine 只要有一個出錯,其他 goroutine 就要停止繼續執行
方法 1:自己刻
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
func search(ctx context.Context, word string) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
if word == "Go" {
fmt.Println(word)
return "", errors.New("error: Go") // 模擬結果
}
time.Sleep(3 * time.Second)
fmt.Println(word)
return word, nil // 模擬結果
}
}
func batch_search(ctx context.Context, words []string) ([]string, error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
tokens = make(chan struct{}, 10)
err error
)
for i, word := range words {
i, word := i, word
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e)
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "Python", "JavaScript", "Java"}
results, err := batch_search(context.Background(), words)
if err != nil {
fmt.Println(err)
time.Sleep(5 * time.Second)
return
}
fmt.Println(results)
}
方法 2: errgroup
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func search(ctx context.Context, word string) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
if word == "Go" {
fmt.Println(word)
return "", errors.New("error: Go") // 模擬結果
}
time.Sleep(3 * time.Second)
fmt.Println(word)
return word, nil // 模擬結果
}
}
func batch_search(ctx context.Context, words []string) ([]string, error) {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(10)
results := make([]string, len(words))
for i, word := range words {
i, word := i, word
eg.Go(func() error {
result, err := search(ctx, word)
if err != nil {
return err
}
results[i] = result
return nil
})
}
err := eg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "Python", "JavaScript", "Java"}
results, err := batch_search(context.Background(), words)
if err != nil {
fmt.Println(err)
time.Sleep(5 * time.Second)
return
}
fmt.Println(results)
}
Check race condition
檢查器會報告所有已經發生的 race condition。然而,它只能偵測到運行時的 race condition;並不能保證之後不會發生。
go run -race main.go