问题 如何正确使用sync.Cond?


我无法弄清楚如何正确使用 sync.Cond。据我所知,锁定Locker和调用条件的Wait方法之间存在竞争条件。此示例在主goroutine中的两条线之间添加了一个人为延迟,以模拟竞争条件:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[在Go Playground上运行]

这会立即引起恐慌:

致命错误:所有goroutines都睡着了 - 僵局!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208,0x1)
    /usr/local/go/src/runtime/sema.go:241 + 0x2e0
sync。(* Cond).Wait(0x10330200,0x0)
    /usr/local/go/src/sync/cond.go:63 + 0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 + 0x1a0

我究竟做错了什么?我该如何避免这种明显的竞争状况?我应该使用更好的同步构造吗?


编辑: 我意识到我应该更好地解释我在这里要解决的问题。我有一个长期运行的goroutine,下载一个大文件和许多其他goroutine,当它们可用时需要访问HTTP标头。这个问题比听起来更难。

我不能使用频道,因为只有一个goroutine会收到该值。而其他一些goroutine会在它们已经可用之后很久就试图检索它们。

下载程序goroutine可以简单地将HTTP标头存储在变量中,并使用互斥锁来保护对它们的访问。但是,这并没有为其他goroutines提供“等待”它们可用的方法。

我曾经想过这两个 sync.Mutex 和 sync.Cond 一起可以完成这个目标,但似乎这是不可能的。


11930
2018-04-26 06:43


起源



答案:


OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用 sync.Cond

你真的不需要 sync.Cond 如果每次写入和读取都有一个goroutine - 一个 sync.Mutex 他们之间的沟通就足够了。 sync.Cond 在多个读者等待共享资源可用的情况下可能很有用。

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

操场

话虽如此,如果情况允许,仍然建议使用频道传递数据。

注意: sync.WaitGroup 这里只用于等待goroutines完成执行。


6
2018-03-13 20:25





您需要确保调用c.Broadcast  你打电话给c.Wait。您的程序的正确版本将是:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m
}

https://play.golang.org/p/O1r8v8yW6h


4
2018-05-17 03:56





package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm


2
2018-04-26 08:14



如果在启动goroutine之前无法锁定互斥锁怎么办?例如,可能有其他goroutines调用Wait()。 - Nathan Osman
可能的是,当调用广播时,不会通知其他goroutine。它也很好 - 但我们都没有提到 - 通常情况与某种状态有关。等待意味着 - 系统处于此状态时我无法继续,等待。广播意味着 - 状态改变了,每个等待的人都应该检查他是否可以继续。请更准确地描述两个goroutine中计算的内容,以及它们为什么必须相互通信。 - lofcek
对不起,我应该在原始问题中详细介绍。我添加了一个编辑,准确描述了我正在尝试做的事情。 - Nathan Osman
我试图理解这一点,对我而言,这似乎引入了潜在的竞争条件。 c.Wait()释放互斥,然后它开始等待通知。理论上,当c.Wait()释放互斥锁时,但在将其自身添加到通知列表之前,goroutine可以锁定互斥锁,然后运行Broadcast(),之后c.Wait()可以将自身添加到通知列表中并等待广播。当然在上面的例子中,它几乎永远不会发生因为time.Sleep在广播之前为c提供了足够的时间等待在广播之前将自己添加到通知列表。 - Kamil Dziedzic
哇,nvm,这在1.7中得到修复我正在查看旧代码: github.com/golang/go/issues/14064 - Kamil Dziedzic


答案:


OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用 sync.Cond

你真的不需要 sync.Cond 如果每次写入和读取都有一个goroutine - 一个 sync.Mutex 他们之间的沟通就足够了。 sync.Cond 在多个读者等待共享资源可用的情况下可能很有用。

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

操场

话虽如此,如果情况允许,仍然建议使用频道传递数据。

注意: sync.WaitGroup 这里只用于等待goroutines完成执行。


6
2018-03-13 20:25





您需要确保调用c.Broadcast  你打电话给c.Wait。您的程序的正确版本将是:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m
}

https://play.golang.org/p/O1r8v8yW6h


4
2018-05-17 03:56





package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm


2
2018-04-26 08:14



如果在启动goroutine之前无法锁定互斥锁怎么办?例如,可能有其他goroutines调用Wait()。 - Nathan Osman
可能的是,当调用广播时,不会通知其他goroutine。它也很好 - 但我们都没有提到 - 通常情况与某种状态有关。等待意味着 - 系统处于此状态时我无法继续,等待。广播意味着 - 状态改变了,每个等待的人都应该检查他是否可以继续。请更准确地描述两个goroutine中计算的内容,以及它们为什么必须相互通信。 - lofcek
对不起,我应该在原始问题中详细介绍。我添加了一个编辑,准确描述了我正在尝试做的事情。 - Nathan Osman
我试图理解这一点,对我而言,这似乎引入了潜在的竞争条件。 c.Wait()释放互斥,然后它开始等待通知。理论上,当c.Wait()释放互斥锁时,但在将其自身添加到通知列表之前,goroutine可以锁定互斥锁,然后运行Broadcast(),之后c.Wait()可以将自身添加到通知列表中并等待广播。当然在上面的例子中,它几乎永远不会发生因为time.Sleep在广播之前为c提供了足够的时间等待在广播之前将自己添加到通知列表。 - Kamil Dziedzic
哇,nvm,这在1.7中得到修复我正在查看旧代码: github.com/golang/go/issues/14064 - Kamil Dziedzic


看起来你c.Wait for Broadcast,你的时间间隔永远不会发生。 同

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

你的片段似乎工作 http://play.golang.org/p/OE8aP4i6gY 我是否错过了你试图获得的东西?


2
2018-04-26 10:27





我终于找到了一种方法来做到这一点并没有涉及到 sync.Cond 完全 - 只是互斥体。

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

这个怎么用?

互斥锁在任务开始时被锁定,确保任何调用 WaitFor()会阻止。一旦标题可用并且互斥锁解锁互斥锁,每次调用 WaitFor() 将一次执行一个。所有未来的调用(即使在goroutine结束之后)都可以锁定互斥锁,因为它总是会被解锁。


1
2018-05-06 04:37





这是一个有两个例程的实际例子。他们一个接一个地开始,但是第二个人在继续之前等待第一个播出的条件:

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    lock := sync.Mutex{}
    lock.Lock()

    cond := sync.NewCond(&lock)

    waitGroup := sync.WaitGroup{}
    waitGroup.Add(2)

    go func() {
        defer waitGroup.Done()

        fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")

        time.Sleep(1 * time.Second)

        fmt.Println("First go routine broadcasts condition")

        cond.Broadcast()
    }()

    go func() {
        defer waitGroup.Done()

        fmt.Println("Second go routine has started and is waiting on condition")

        cond.Wait()

        fmt.Println("Second go routine unlocked by condition broadcast")
    }()

    fmt.Println("Main go routine starts waiting")

    waitGroup.Wait()

    fmt.Println("Main go routine ends")
}

输出可能略有不同,因为第二个例程可以在第一个例程之前开始,反之亦然:

Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends

https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70


1
2017-08-03 14:16