问题 如何正确使用sync.Cond?
我无法弄清楚如何正确使用 sync.Cond
。据我所知,锁定Locker和调用条件的Wait方法之间存在竞争条件。此示例在主goroutine中的两条线之间添加了一个人为延迟,以模拟竞争条件:
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
[在Go Playground上运行]
这会立即引起恐慌:
致命错误:所有goroutines都睡着了 - 僵局!
goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208,0x1)
/usr/local/go/src/runtime/sema.go:241 + 0x2e0
sync。(* Cond).Wait(0x10330200,0x0)
/usr/local/go/src/sync/cond.go:63 + 0xe0
main.main()
/tmp/sandbox301865429/main.go:17 + 0x1a0
我究竟做错了什么?我该如何避免这种明显的竞争状况?我应该使用更好的同步构造吗?
编辑: 我意识到我应该更好地解释我在这里要解决的问题。我有一个长期运行的goroutine,下载一个大文件和许多其他goroutine,当它们可用时需要访问HTTP标头。这个问题比听起来更难。
我不能使用频道,因为只有一个goroutine会收到该值。而其他一些goroutine会在它们已经可用之后很久就试图检索它们。
下载程序goroutine可以简单地将HTTP标头存储在变量中,并使用互斥锁来保护对它们的访问。但是,这并没有为其他goroutines提供“等待”它们可用的方法。
我曾经想过这两个 sync.Mutex
和 sync.Cond
一起可以完成这个目标,但似乎这是不可能的。
11930
2018-04-26 06:43
起源
答案:
OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用 sync.Cond
。
你真的不需要 sync.Cond
如果每次写入和读取都有一个goroutine - 一个 sync.Mutex
他们之间的沟通就足够了。 sync.Cond
在多个读者等待共享资源可用的情况下可能很有用。
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
操场
话虽如此,如果情况允许,仍然建议使用频道传递数据。
注意: sync.WaitGroup
这里只用于等待goroutines完成执行。
6
2018-03-13 20:25
您需要确保调用c.Broadcast 后 你打电话给c.Wait。您的程序的正确版本将是:
package main
import (
"fmt"
"sync"
)
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m
}
https://play.golang.org/p/O1r8v8yW6h
4
2018-05-17 03:56
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock() // main gouroutine is owner of lock
c := sync.NewCond(&m)
go func() {
m.Lock() // obtain a lock
defer m.Unlock()
fmt.Println("3. goroutine is owner of lock")
time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
c.Broadcast() // State has been changed, publish it to waiting goroutines
fmt.Println("4. goroutine will release lock soon (deffered Unlock")
}()
fmt.Println("1. main goroutine is owner of lock")
time.Sleep(1 * time.Second) // initialization
fmt.Println("2. main goroutine is still lockek")
c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
// Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
m.Unlock()
fmt.Println("Done")
}
http://play.golang.org/p/fBBwoL7_pm
2
2018-04-26 08:14
答案:
OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用 sync.Cond
。
你真的不需要 sync.Cond
如果每次写入和读取都有一个goroutine - 一个 sync.Mutex
他们之间的沟通就足够了。 sync.Cond
在多个读者等待共享资源可用的情况下可能很有用。
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
操场
话虽如此,如果情况允许,仍然建议使用频道传递数据。
注意: sync.WaitGroup
这里只用于等待goroutines完成执行。
6
2018-03-13 20:25
您需要确保调用c.Broadcast 后 你打电话给c.Wait。您的程序的正确版本将是:
package main
import (
"fmt"
"sync"
)
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m
}
https://play.golang.org/p/O1r8v8yW6h
4
2018-05-17 03:56
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock() // main gouroutine is owner of lock
c := sync.NewCond(&m)
go func() {
m.Lock() // obtain a lock
defer m.Unlock()
fmt.Println("3. goroutine is owner of lock")
time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
c.Broadcast() // State has been changed, publish it to waiting goroutines
fmt.Println("4. goroutine will release lock soon (deffered Unlock")
}()
fmt.Println("1. main goroutine is owner of lock")
time.Sleep(1 * time.Second) // initialization
fmt.Println("2. main goroutine is still lockek")
c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
// Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
m.Unlock()
fmt.Println("Done")
}
http://play.golang.org/p/fBBwoL7_pm
2
2018-04-26 08:14
看起来你c.Wait for Broadcast,你的时间间隔永远不会发生。
同
time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()
你的片段似乎工作 http://play.golang.org/p/OE8aP4i6gY 我是否错过了你试图获得的东西?
2
2018-04-26 10:27
我终于找到了一种方法来做到这一点并没有涉及到 sync.Cond
完全 - 只是互斥体。
type Task struct {
m sync.Mutex
headers http.Header
}
func NewTask() *Task {
t := &Task{}
t.m.Lock()
go func() {
defer t.m.Unlock()
// ...do stuff...
}()
return t
}
func (t *Task) WaitFor() http.Header {
t.m.Lock()
defer t.m.Unlock()
return t.headers
}
这个怎么用?
互斥锁在任务开始时被锁定,确保任何调用 WaitFor()
会阻止。一旦标题可用并且互斥锁解锁互斥锁,每次调用 WaitFor()
将一次执行一个。所有未来的调用(即使在goroutine结束之后)都可以锁定互斥锁,因为它总是会被解锁。
1
2018-05-06 04:37
这是一个有两个例程的实际例子。他们一个接一个地开始,但是第二个人在继续之前等待第一个播出的条件:
package main
import (
"sync"
"fmt"
"time"
)
func main() {
lock := sync.Mutex{}
lock.Lock()
cond := sync.NewCond(&lock)
waitGroup := sync.WaitGroup{}
waitGroup.Add(2)
go func() {
defer waitGroup.Done()
fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")
time.Sleep(1 * time.Second)
fmt.Println("First go routine broadcasts condition")
cond.Broadcast()
}()
go func() {
defer waitGroup.Done()
fmt.Println("Second go routine has started and is waiting on condition")
cond.Wait()
fmt.Println("Second go routine unlocked by condition broadcast")
}()
fmt.Println("Main go routine starts waiting")
waitGroup.Wait()
fmt.Println("Main go routine ends")
}
输出可能略有不同,因为第二个例程可以在第一个例程之前开始,反之亦然:
Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends
https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70
1
2017-08-03 14:16