Rust - Sharing data between threads

Previously published

This article was previously published on len-learns-rust.com. A full index of these articles can be found here.

Now that we can send messages to threads I want to see how we can access shared data from those threads. This isn’t the best design choice as shared data needs to be protected by locks so that it is accessed in an atomic fashion and the various threads involved with this data will contend with each other over the locks. It’s generally considered to be far better to pass the data between threads and only have one thread able to access the data at any one time. This can be summed up as:

Do not communicate by sharing memory; instead, share memory by communicating.1

However, so that I can begin to contemplate moving to this kind of style I would like to be able to see how shared data could work in Rust. To be able to pass some data to our thread we need to solve two problems. The first is that we need to convince the Rust compiler that the data will live as long as we need it to. For this we need to use a std::sync::Arc which allows us to link an atomic reference count with the data that we want to share.

If we try and share data without the std::sync::Arc, like this, perhaps:

    #[test]
    fn test_channel_thread_with_shared_data() {

        let mut data = HashSet::<String>::new();

        let mut thread = ChannelThread::new( |message| {
            println!("got message {}", message);

            data.insert(message);

            return true;
        });

we end up with this error:

Compiler says "no!"
error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
  --> src/lib.rs:87:46
   |
87 |         let mut thread = ChannelThread::new( |message| {
   |                                              ^^^^^^^^^ may outlive borrowed value `data`
...
90 |             data.insert(message);
   |             ---- `data` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/lib.rs:87:26
   |
87 |           let mut thread = ChannelThread::new( |message| {
   |  __________________________^
88 | |             println!("got message {}", message);
89 | |
90 | |             data.insert(message);
91 | |
92 | |             return true;
93 | |         });
   | |__________^
help: to force the closure to take ownership of `data` (and any other referenced variables), use the `move` keyword
   |
87 |         let mut thread = ChannelThread::new( move |message| {
   |                                              ++++

Adding the reference count fixes the lifetime issue, but now we have another problem:

    #[test]
    fn test_channel_thread_with_shared_data_with_arc() {

        let mut data = Arc::new(HashSet::<String>::new());

        let mut shared_data = Arc::clone(&data);

        let mut thread = ChannelThread::new( |message| {
            println!("got message {}", message);

            data.insert(message);

            return true;
        });
Compiler says "no!"
error[E0596]: cannot borrow data in an `Arc` as mutable
   --> src/lib.rs:133:13
    |
133 |             data.insert(message);
    |             ^^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable
    |
    = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<HashSet<String>>`

and the documentation for std::sync::Arc tells us exactly what we need to do:

Shared references in Rust disallow mutation by default, and Arc is no exception: you cannot generally obtain a mutable reference to something inside an Arc. If you need to mutate through an Arc, use Mutex, RwLock, or one of the Atomic types.

So, much like we did with the Id Manager when we fixed the interior mutability problem, we end up using a std::sync::Mutex to allow us to share something as immutable and then, in controlled conditions, obtain a mutable reference from it.

This results in the final code:

    #[test]
    fn test_channel_thread_with_shared_data_locked() {

        let data = Arc::new(Mutex::new(HashSet::<String>::new()));

        let shared_data = Arc::clone(&data);

        let mut thread = ChannelThread::new(move |message| {
            println!("got message {}", message);

            shared_data.lock().expect("failed to lock").insert(message);

            return true;
        });

We obtain a mutable reference from shared_data by calling std::sync::Mutex::lock() and we can then access the data as mutable in both the spawned thread and the main thread.

        for i in 1..15 {
            println!("sending {} to thread", i);
            thread.send(i.to_string());

            let data = data.lock().expect("failed to lock");

            println!("data contains: {} ", data.len());

            sleep(Duration::from_millis(10));
        }

We could, of course, use this to access the shared data from multiple spawned threads if necessary, but all would contend for the lock.

Using the “share memory by communicating” approach we could do this, instead:

    #[test]
    fn test_thread_share_data_by_communicating() {
        let mut data = HashSet::<String>::new();

        let (to_thread, from_controller) = mpsc::channel::<String>();
        let (to_controller, from_thread) = mpsc::channel::<HashSet<String>>();

        let thread = thread::spawn(move || {
            println!("spawned thread has started");

            println!("spawned is running");

            loop {
                match from_controller.recv() {
                    Ok(message) => {
                        data.insert(message);
                    }
                    Err(reason) => {
                        println!("thread recv error {}", reason);

                        to_controller.send(data).expect("failed to send data");
                        break;
                    }
                }
            }
            println!("spawned thread is done");
        });

        for i in 1..15 {
            println!("sending {} to thread", i);
            to_thread
                .send(i.to_string())
                .expect("failed to send to thread");
        }

        println!("close channel, signal thread we're done");

        drop(to_thread);

        println!("wait for thread to end");

        thread.join().expect("failed to join with thread");

        if let Ok(data) = from_thread.recv() {
            dump_data(&data);
        } else {
            println!("failed to get data from thread");
        }

        println!("all done...");
    }

Here we pass the data straight to the thread and operate on it solely within the thread until the thread is finished. Once the thread is done it returns the data to the controller via a second channel. The controller can then extract the data and use it. No locking is required but the data cannot be accessed concurrently. Often this is a considerably better approach to software design.

Join in

The code can be found here on GitHub each step on the journey will have one or more separate directories of code, so this article’s code is here and here this allows for easy comparison of changes at each stage.

Of course, there may be a better way; leave comments if you’d like to help me learn.