问题 如何将对堆栈变量的引用传递给线程?


我正在编写一个WebSocket服务器,Web客户端连接到多线程计算机AI上下棋。 WebSocket服务器想传递一个 Logger 对象进入AI代码。该 Logger 对象将管理从AI到Web客户端的日志行。该 Logger 必须包含对客户端连接的引用。

我对生命周期如何与线程交互感到困惑。我用a重现了这个问题 Wrapper struct由类型参数化。该 run_thread 函数尝试解包该值并记录它。

use std::fmt::Debug;
use std::thread;

struct Wrapper<T: Debug> {
    val: T,
}

fn run_thread<T: Debug>(wrapper: Wrapper<T>) {
    let thr = thread::spawn(move || {
        println!("{:?}", wrapper.val);
    });

    thr.join();
}

fn main() {
    run_thread(Wrapper::<i32> { val: -1 });
}

wrapper 参数存在于堆栈中,其生命周期并未延续 run_thread堆栈帧,即使线程将在堆栈帧结束之前连接。我可以从堆栈中复制值:

use std::fmt::Debug;
use std::thread;

struct Wrapper<T: Debug + Send> {
    val: T,
}

fn run_thread<T: Debug + Send + 'static>(wrapper: Wrapper<T>) {
    let thr = thread::spawn(move || {
        println!("{:?}", wrapper.val);
    });

    thr.join();
}

fn main() {
    run_thread(Wrapper::<i32> { val: -1 });
}

如果这不起作用 T 是一个我不想复制的大对象的引用:

use std::fmt::Debug;
use std::thread;

struct Wrapper<T: Debug + Send> {
    val: T,
}

fn run_thread<T: Debug + Send + 'static>(wrapper: Wrapper<T>) {
    let thr = thread::spawn(move || {
        println!("{:?}", wrapper.val);
    });

    thr.join();
}

fn main() {
    let mut v = Vec::new();
    for i in 0..1000 {
        v.push(i);
    }

    run_thread(Wrapper { val: &v });
}

结果如下:

error: `v` does not live long enough
  --> src/main.rs:22:32
   |
22 |     run_thread(Wrapper { val: &v });
   |                                ^ does not live long enough
23 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

我能想到的唯一解决方案是使用 Arc

use std::fmt::Debug;
use std::sync::Arc;
use std::thread;

struct Wrapper<T: Debug + Send + Sync + 'static> {
    arc_val: Arc<T>,
}

fn run_thread<T: Debug + Send + Sync + 'static>(wrapper: &Wrapper<T>) {
    let arc_val = wrapper.arc_val.clone();
    let thr = thread::spawn(move || {
        println!("{:?}", *arc_val);
    });

    thr.join();
}

fn main() {
    let mut v = Vec::new();
    for i in 0..1000 {
        v.push(i);
    }

    let w = Wrapper { arc_val: Arc::new(v) };
    run_thread(&w);

    println!("{}", (*w.arc_val)[0]);
}

在我的真实节目中,它似乎都是 Logger 并且必须放置连接对象 Arc 包装。看起来很烦人,客户端需要将连接打包成一个 Arc 当它是库的内部代码并行化时。这尤其令人讨厌,因为保证连接的生命周期大于工作线程的生命周期。

我错过了什么吗?


8798
2017-09-23 23:00


起源



答案:


标准库中的线程支持允许创建的线程比创建它们的线程更长;这是好事!但是,如果要将对堆栈分配的变量的引用传递给其中一个线程,则无法保证该变量在线程执行时仍然有效。在其他语言中,这将允许线程访问无效内存,从而产生一堆内存安全问题。

幸运的是,我们不仅限于标准库。至少有两个板条箱提供 范围的线程  - 保证在某个范围结束之前退出的线程。这些可以确保堆栈变量在整个线程持续时间内可用:

还有一些板条箱可以抽象出“线程”的低级细节,但是可以让你实现目标:

以下是各自的示例。每个例子都产生了许多线程并且在没有锁定的情况下改变了一个本地向量,没有 Arc,没有克隆。请注意,突变有一个 sleep 呼叫帮助验证呼叫是否并行发生。

您可以扩展示例以共享对任何实现类型的引用 Sync, 比如一个 Mutex 或者 Atomic*。然而,使用这些将引入锁定。

作用域,线程池

extern crate scoped_threadpool;
use scoped_threadpool::Pool;

use std::thread;
use std::time::Duration;

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];
    let mut pool = Pool::new(vec.len() as u32);

    pool.scoped(|scoped| {
        for e in &mut vec {
            scoped.execute(move || {
                thread::sleep(Duration::from_millis(1000));
                *e += 1;
            });
        }
    });

    println!("{:?}", vec);
}

横梁

extern crate crossbeam;

use std::thread;
use std::time::Duration;

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];

    crossbeam::scope(|scope| {
        for e in &mut vec {
            scope.spawn(move || {
                thread::sleep(Duration::from_millis(1000));
                *e += 1;
            });
        }
    });

    println!("{:?}", vec);
}

人造丝

extern crate rayon;

use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use std::{thread, time::Duration};

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];

    vec.par_iter_mut().for_each(|e| {
        thread::sleep(Duration::from_millis(1000));
        *e += 1;
    });

    println!("{:?}", vec);
}

客户端需要在连接中打包连接 Arc 当它是库的内部代码并行化时

也许你可以更好地隐藏你的并行性?你能接受记录器,然后把它包装成一个 Arc/ Mutex 在把它交给你的线程之前?


11
2017-09-24 01:26



非常感谢你的回复!我的解决方案是制作 Logger 实行 Clone,并有一个类型的字段 Arc<Mutex<Connection>>。然后,用户可以将记录器的克隆传递给线程代码。用户无法转让所有权 Connection 对于线程代码(用户需要它用于其他目的),所以我没有看到线程代码可以方便地执行 Arc 和拳击代表用户。 - Ned Ruggeri


答案:


标准库中的线程支持允许创建的线程比创建它们的线程更长;这是好事!但是,如果要将对堆栈分配的变量的引用传递给其中一个线程,则无法保证该变量在线程执行时仍然有效。在其他语言中,这将允许线程访问无效内存,从而产生一堆内存安全问题。

幸运的是,我们不仅限于标准库。至少有两个板条箱提供 范围的线程  - 保证在某个范围结束之前退出的线程。这些可以确保堆栈变量在整个线程持续时间内可用:

还有一些板条箱可以抽象出“线程”的低级细节,但是可以让你实现目标:

以下是各自的示例。每个例子都产生了许多线程并且在没有锁定的情况下改变了一个本地向量,没有 Arc,没有克隆。请注意,突变有一个 sleep 呼叫帮助验证呼叫是否并行发生。

您可以扩展示例以共享对任何实现类型的引用 Sync, 比如一个 Mutex 或者 Atomic*。然而,使用这些将引入锁定。

作用域,线程池

extern crate scoped_threadpool;
use scoped_threadpool::Pool;

use std::thread;
use std::time::Duration;

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];
    let mut pool = Pool::new(vec.len() as u32);

    pool.scoped(|scoped| {
        for e in &mut vec {
            scoped.execute(move || {
                thread::sleep(Duration::from_millis(1000));
                *e += 1;
            });
        }
    });

    println!("{:?}", vec);
}

横梁

extern crate crossbeam;

use std::thread;
use std::time::Duration;

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];

    crossbeam::scope(|scope| {
        for e in &mut vec {
            scope.spawn(move || {
                thread::sleep(Duration::from_millis(1000));
                *e += 1;
            });
        }
    });

    println!("{:?}", vec);
}

人造丝

extern crate rayon;

use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use std::{thread, time::Duration};

fn main() {
    let mut vec = vec![1, 2, 3, 4, 5];

    vec.par_iter_mut().for_each(|e| {
        thread::sleep(Duration::from_millis(1000));
        *e += 1;
    });

    println!("{:?}", vec);
}

客户端需要在连接中打包连接 Arc 当它是库的内部代码并行化时

也许你可以更好地隐藏你的并行性?你能接受记录器,然后把它包装成一个 Arc/ Mutex 在把它交给你的线程之前?


11
2017-09-24 01:26



非常感谢你的回复!我的解决方案是制作 Logger 实行 Clone,并有一个类型的字段 Arc<Mutex<Connection>>。然后,用户可以将记录器的克隆传递给线程代码。用户无法转让所有权 Connection 对于线程代码(用户需要它用于其他目的),所以我没有看到线程代码可以方便地执行 Arc 和拳击代表用户。 - Ned Ruggeri