Rust: работа с потоками



Книга Rust: работа с потоками

Жил я себе поживал раньше без забот и без хлопот в однопоточной счастливой стране JavaScript, где имел дело с потоками разве что при взаимодействии между веб-сайтом и расширением для Chrome. Поэтому, когда кто-то заводил разговор о трудностях параллелизма и конкурентности, я никогда по-настоящему не понимал, из-за чего весь сыр-бор.

Я начал изучать Rust несколько недель назад, переписывая текстовую игру, которую до этого сделал с помощью Vue. Это игра на выживание, в которой нужно собирать и изготавливать предметы, чтобы есть и пить. Условие победы здесь одно — постараться продержаться как можно большее количество дней. Мне удалось привести в рабочее состояние большинство игровых функций, но была досадная ошибка: если пользователь уходил на несколько часов из игры, нельзя было проверить статистику, пока он не вернётся. А иногда проходило по нескольку месяцев без изменений!

Я знал, что эту проблему можно решить с помощью потоков, поэтому наконец набрался смелости и прочитал главу «Многопоточность без страха» из «Языка программирования Rust» от авторов Steve Klabnik, Carol Nichols и участников сообщества Rust.

Итак, мне нужно было получить возможность отслеживать статистику и вести подсчёт дней каждые несколько секунд, а также уведомлять игрока, как только статистика достигнет 0. Создать новый поток, который запускает какой-то код каждые 10 секунд, было легко:

thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(10));
        println!("Now we should decrease stats and update day count…");
    });

Но как изменить статистику и подсчёт дней из этого потока, избежав проблем с владением?

Всё оказалось намного проще, чем я ожидал, ведь можно создать мьютекс (взаимное исключение), чтобы в каждый конкретный момент времени только один поток мог получить доступ к этим данным. Завладеть этим мьютексом будет стремиться несколько потоков, поэтому нужно обернуть его в Arc (автоматический подсчёт ссылок), чтобы код работал правильно (всё это гораздо лучше объяснено в главе «Многопоточное разделяемое состояние»). Код в конечном итоге выглядит так:

fn main() {
        let stats = Arc::new(Mutex::new(Stats {
            water: Stat::new(100.0),
            food: Stat::new(100.0),
            energy: Stat::new(100.0),
        }));

        let days = Arc::new(Mutex::new(0));

        control_time(&days, &stats);

        // ...
    }

    fn control_time(days: &Arc<Mutex<i32>>, stats: &Arc<Mutex<Stats>>) {
        let now = Instant::now();
        let days = Arc::clone(&days);
        let stats = Arc::clone(&stats);
        thread::spawn(move || loop {
            thread::sleep(Duration::from_secs(10));

            let elapsed_time = now.elapsed().as_secs();
            let mut elapsed_days = days.lock().unwrap();
            *elapsed_days = elapsed_time as i32 / 60;

            let mut stats_lock = stats.lock().unwrap();
            decrease_stats(&mut stats_lock, 10.0);
        });
    }

В основном потоке можем оставить days (дни) и stats (статистику) и просто добавить .lock().

И всё работает хорошо… Но проблема, похоже, осталась: основной поток занят в ожидании данных от пользователя! Несмотря на то, что статистика и подсчёт дней успешно обновляются каждые 10 секунд, основной поток ни сном ни духом об этом не ведает. Пришло время добавить ещё один поток!

Этот поток должен обработать данные от пользователя и отправить действие в основной поток. Для этого лучше использовать другой способ передачи данных между потоками, который изложен в соседней главе «Языка программирования Rust» о передаче сообщений.

Для отправки действия в основной поток мне нужен был канал, а основной поток должен был дать знать новому потоку с данными от пользователя, когда он готов принять действие (так как на некоторые действия нужно время). Каналы, которые Rust предлагает в стандартной библиотеке, идут по типу несколько производителей — один потребитель. Это означает, что между каналами двустороннее взаимодействие отсутствует, поэтому в итоге я создал два канала:

let (tx, rx) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();

Я особо не заморачивался с именами.

Теперь создаём новый поток, который будет ждать сигнала от основного потока, запросит данные от пользователя и отправит их обратно в основной поток. Обратите внимание, что rx2.recv() блокирует поток до тех пор, пока не будет получено сообщение: это позволит нам контролировать, когда отправить запрос данных пользователю.

thread::spawn(move || loop {
        let _ = rx2.recv();

        let action = request_input("\nWhat to do?");

        tx.send(action).ok();
    });

Затем из основного потока отправляем сообщение для запроса входных данных и переходим к созданию цикла, который будет постоянно проверять статистику и данные от пользователя с помощью rx.try_recv() (это не блокирует поток). Если статистика достигает 0, цикл обрывается, завершая игру. Если не достигает 0, снова запрашиваем входные данные.

tx2.send(String::from("Ready for input")).ok();

    loop {
        if let Ok(action) = rx.try_recv() {
            match action.trim() {
                // обрабатываются все возможные действия
            }
        }
        if is_game_over(&stats.lock().unwrap()) {
            break;
        } else {
            tx2.send(String::from("Ready for input")).ok();
        }
    }

Для меня это было совершенно естественно: всё равно что отправить событие в JavaScript! Но вообще-то не совсем.

Когда вы отправляете событие в JavaScript, никому нет дела, слушает кто-то это событие или нет. Вы отправляете его, и это сообщение навсегда теряется, если нет слушателя.

В стране Rust, если в лесу падает дерево, должен быть кто-то, кто это услышит. В противном случае лес запаникует и сожжёт сам себя, а мир взорвётся. А если этот кто-то занят другими делами, все деревья будут ждать в очереди и не упадут, пока этот кто-то не начнет их слышать. Вот что у нас происходило:

Входной поток не ждёт сообщения о готовности ready от основного потока. Однако проблема в том, что основной поток постоянно отправляет сообщения. Не забывайте, что действие action прослушивается через try_recv, поэтому оно не блокируется. Даже в том случае, когда пользователь вводит sleep, основной поток реально спит в течение нескольких секунд из-за того, что мы уже завалили его сообщениями, и входной поток получает сообщения одно за другим. Для тех, кто привык к другим языкам, это может показаться естественным, но не пришельцам из JavaScript: это взорвало мне мозг и долгое время не укладывалось в голове.

В итоге я нашёл простое решение: отправлять сообщение о готовности ready только после того, как получено и обработано последнее сообщение:

tx2.send(String::from("Ready for input")).ok();

    loop {
        if let Ok(action) = rx.try_recv() {
            match action.trim() {
                // обрабатываются все возможные действия
            }
            // теперь мы готовы к другому действию:
            tx2.send(String::from("Ready for input")).ok();
        }
        if is_game_over(&stats.lock().unwrap()) {
            break;
        }
    }

Вот и всё, все проблемы решены! 

За те несколько недель, что я изучаю Rust, трудным мне показался не сам язык — трудно было оставить прежний образ мыслей JavaScript-разработчика (смелее переходите с JavaScript на выбранный вами язык). Именно это мне больше всего и нравится — выходить из зоны комфорта! Загляните в код игры здесь: https://github.com/codegram/live-rust

47   0  
    Ничего не найдено.

Добавить ответ:
Отменить.