跳轉到

併發控制

Goroutine & Channel

CSP(communicating sequential processes)

Do not communicate by sharing memory; instead, share memory by communicating

img img img

sync.Once 的使用場景

常應用於單例模式,例如初始化配置、保持資料庫連線等。作用與init函數類似,但有差別。

  • init 函數是當所在的 package 首次被載入時執行,若遲遲未被使用,則既浪費了內存,又延長了程式載入時間。
  • sync.Once 可以在程式碼的任意位置初始化和調用,因此可以延遲到使用時再執行,並發場景下是線程安全的。

在多數情況下,sync.Once被用來控制變數的初始化,這個變數的讀寫滿足以下三個條件:

  • 當且僅當第一次存取某個變數時,進行初始化
  • 變數初始化過程中,所有讀取都被阻塞,直到初始化完成
  • 變數僅初始化一次,初始化完成後駐留在記憶體。

sync.Once僅提供了一個方法Do,參數 f 是物件初始化函數。

func  (o *Once)  Do (f func () )

singleflight 的使用場景

將併發呼叫合併成一個呼叫的特性非常適合用來防止緩存擊穿

img

其他例子

情境 1

在多執行緒下共享變數

方法 1:sync.Mutex

package main

import (
    "fmt"
    "sync"
    "time"
)

type SafeNumber struct {
    v   int
    mux sync.Mutex
}

func main() {
    total := SafeNumber{v: 0}
    for i := 0; i < 1000; i++ {
        go func() {
            total.mux.Lock()
            total.v++
            total.mux.Unlock()
        }()
    }
    time.Sleep(time.Second)
    total.mux.Lock()
    fmt.Println(total.v)
    total.mux.Unlock()
}

方法 2:Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    total := 0
    ch := make(chan int, 1)
    ch <- total
    for i := 0; i < 1000; i++ {
        go func() {
            ch <- <-ch + 1
        }()
    }
    time.Sleep(time.Second)
    fmt.Println(<-ch)
}

情境 2

多執行緒下,經常需要處理的是執行緒之間的狀態管理,其中一個經常發生的事情是等待,例如 A 執行緒需要等 B 執行緒計算並取得資料後才能繼續往下執行

方法 1:sync.WaitGroup

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func work(wg *sync.WaitGroup) {
      defer wg.Done()
      n := rand.Intn(20)
      time.Sleep(time.Duration(n) * time.Second)
      fmt.Printf("Done in %ds\n", n)
}

func main() {
    wg := &sync.WaitGroup{}
    wg.Add(2)
    go work(wg)
    go work(wg)
    wg.Wait()
    fmt.Println("Finished!")
}

方法 2:Channel

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func work(c chan string) {
    time.Sleep(time.Duration(rand.Intn(20)) * time.Second)
    fmt.Printf("Done in %ds\n", n)
    c <- "DONE"
}

func main() {
    ch := make(chan string)
    go work(ch)
    go work(ch)
    <-ch
    <-ch
    fmt.Println("Finished!")
}

情境 3

需要同時處理 5 個背景工作,且需要收到這 5 個 Job 處理的結果,並顯示在畫面上,如果其中一個 Job 失敗,就跳出 main 函式,而這 5 個 JOB 預期在 10 秒內結束,如果超過 10 秒,也是一樣跳出 main 函式

package main

import (
    "errors"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func work(outChan chan int, errChan chan error, val int, wg *sync.WaitGroup) {
    defer wg.Done()
    t := rand.Int31n(20)
    time.Sleep(time.Duration(t) * time.Second)
    if t > 5 {
        fmt.Println("finished job id:", val)
        outChan <- val
    } else {
        errChan <- errors.New(fmt.Sprintf("failed job id: %d", val))
    }
}

func main() {
    outChan := make(chan int)
    errChan := make(chan error)
    finishChan := make(chan struct{})
    wg := &sync.WaitGroup{}
    sum := 0

    wg.Add(5)
    for i := 0; i < 5; i++ {
        go work(outChan, errChan, i, wg)
    }

    go func() {
        wg.Wait()
        close(finishChan)
    }()

    timeout := time.After(10 * time.Second)
LOOP:
    for {
        select {
        case val := <-outChan:
            sum += val
        case err := <-errChan:
            fmt.Println("error:", err)
            break LOOP
        case <-finishChan:
            fmt.Println("finish all jobs")
            break LOOP
        case <-timeout:
            fmt.Println("timeout")
            break LOOP
        }
    }

    fmt.Println("sum:", sum)
}

情境 4

控制併發的數量

package main

import (
    "errors"
    "fmt"
    "sync"
)

func search(word string) (string, error) {
    if word == "Go" {
        return "", errors.New("error: Go") // 模擬結果
    }
    return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

func batch_search(words []string) ([]string, error) {
    var (
        wg      = sync.WaitGroup{}
        once    = sync.Once{}
        results = make([]string, len(words))
        tokens  = make(chan struct{}, 10)
        err     error
    )

    for i, word := range words {
        tokens <- struct{}{}
        wg.Add(1)

        go func(word string, i int) {
            defer func() {
                wg.Done()
                <-tokens
            }()

            result, e := search(word)
            if e != nil {
                once.Do(func() {
                    err = e
                })

                return
            }

            results[i] = result
        }(word, i)
    }

    wg.Wait()

    return results, err
}

func main() {
    words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
    results, err := batch_search(words)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(results)
}

情境 5

goroutine 只要有一個出錯,其他 goroutine 就要停止繼續執行

方法 1:自己刻

package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)

func search(ctx context.Context, word string) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    default:
        if word == "Go" {
            fmt.Println(word)
            return "", errors.New("error: Go") // 模擬結果
        }
        time.Sleep(3 * time.Second)
        fmt.Println(word)
        return word, nil // 模擬結果
    }
}

func batch_search(ctx context.Context, words []string) ([]string, error) {
    ctx, cancel := context.WithCancelCause(ctx)
    defer cancel(nil)

    var (
        wg      = sync.WaitGroup{}
        once    = sync.Once{}
        results = make([]string, len(words))
        tokens  = make(chan struct{}, 10)
        err     error
    )

    for i, word := range words {
        i, word := i, word

        tokens <- struct{}{}
        wg.Add(1)

        go func(word string, i int) {
            defer func() {
                wg.Done()
                <-tokens
            }()

            result, e := search(ctx, word)
            if e != nil {
                once.Do(func() {
                    err = e
                    cancel(e)
                })

                return
            }

            results[i] = result
        }(word, i)
    }

    wg.Wait()

    return results, err
}

func main() {
    words := []string{"Go", "Rust", "Python", "JavaScript", "Java"}
    results, err := batch_search(context.Background(), words)
    if err != nil {
        fmt.Println(err)
        time.Sleep(5 * time.Second)
        return
    }
    fmt.Println(results)
}

方法 2: errgroup

package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func search(ctx context.Context, word string) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    default:
        if word == "Go" {
            fmt.Println(word)
            return "", errors.New("error: Go") // 模擬結果
        }
        time.Sleep(3 * time.Second)
        fmt.Println(word)
        return word, nil // 模擬結果
    }
}

func batch_search(ctx context.Context, words []string) ([]string, error) {
    eg, ctx := errgroup.WithContext(ctx)
    eg.SetLimit(10)

    results := make([]string, len(words))

    for i, word := range words {
        i, word := i, word

        eg.Go(func() error {
            result, err := search(ctx, word)
            if err != nil {
                return err
            }

            results[i] = result
            return nil
        })
    }

    err := eg.Wait()

    return results, err
}

func main() {
    words := []string{"Go", "Rust", "Python", "JavaScript", "Java"}
    results, err := batch_search(context.Background(), words)
    if err != nil {
        fmt.Println(err)
        time.Sleep(5 * time.Second)
        return
    }
    fmt.Println(results)
}

Check race condition

檢查器會報告所有已經發生的 race condition。然而,它只能偵測到運行時的 race condition;並不能保證之後不會發生。

go run -race main.go

Reference