The Actor pattern with async rust

Posted February 21, 2022 by Richard Leek ‐ 4 min read

This weekend I listed to a podcast with one of the maintainers of tokio and learned how I could cleanup my server

Listened to a podcast over the weekend with one of the maintainers of the tokio (the async runtime I’m using for reoserv) crate for rust. She wrote a blog post about the “Actor” model and how I can refactor my server to be a lot more modular/cleaner.

I was kind of half way there with my implementation. For every connection to the server I spawn a tokio::task (green thread) that owns the underlying Tcp socket.

loop {
    let (socket, addr) = listener.accept().await.unwrap();

    // these clones look expensive but they're just reference counted mutexes so it's cheap
    let players = players.clone();
    let active_account_ids = active_account_ids.clone();
    let world = world.clone();

    let num_of_players = players.lock().await.len();
    if num_of_players >= SETTINGS.server.max_connections as usize {
        warn!("{} has been disconnected because the server is full", addr);
        continue;
    }

    info!(
        "connection accepted ({}) {}/{}",
        addr,
        num_of_players + 1,
        SETTINGS.server.max_connections
    );

    let pool = pool.clone();

    // Spawn a new task to handle each player async
    tokio::spawn(async move {
        if let Err(e) = handle_player(world, players, active_account_ids, socket, pool).await {
            error!("there was an error processing player: {:?}", e);
        }
    });
}

For the main loop to communicate with the socket I created a multi-producer-single-consumer channel (this gets split into two halves: one for reading, and one for writing). The “player” data structure owns the read half, and the main server function owns a mutex guarded hashmap of player id, write channels.

impl Player {
    pub async fn new(players: Players, socket: TcpStream, player_id: EOShort) -> Self {
        /// this creates the channel for writing/reading to the player task
        let (tx, rx) = mpsc::unbounded_channel();

        // we insert the write half into the hashmap owned by the main server function
        players.lock().await.insert(player_id, tx);

        Self {
            rx,
            bus: PacketBus::new(socket),
            state: State::Uninitialized,
            account_id: 0,
            character_id: 0,
            num_of_characters: 0,
        }
    }
}

The part I was missing is that I can create channels for single purpose “get” operations.

There’s a lot more data the the player structure owns that I need access to within some of the packet handlers. What I’ve been doing up till now is just passing this data down to the handle_packet function which is getting pretty big (10 arguments).

if let Some(packet) = queue.get_mut().pop_front() {
    let db_pool = db_pool.clone();
    match handle_packet(
        player_id,
        packet,
        &mut player.bus,
        world.clone(),
        players.clone(),
        active_account_ids.clone(),
        db_pool,
        &player_ip,
        player.account_id,
        player.num_of_characters,
        &character,
    )
    .await
    {
        Ok(()) => {}
        Err(e) => {
            error!("error handling packet: {:?}", e);
        }
    }
}

What I can instead do is create new message types for the player channel that takes a one-shot channel and just await a response in the middle of the packet handler.

For example during the initial handshake between the client and server I need to tell the client some information about how the packets I send them are going to be encoded. I pass this data down to the init handler like so:

match family {
    Family::Init => match action {
        Action::Init => {
            handlers::init::init(
                buf,
                player_id,
                bus.sequencer.get_init_sequence_bytes(), // <- data I need to tell the client about
                bus.packet_processor.decode_multiple, // <- data I need to tell the client about
                bus.packet_processor.encode_multiple, // <- data I need to tell the client about
                players.lock().await.get(&player_id).unwrap(), // <- already providing the write half of the player channel
            )
            .await?;
        }
        _ => {
            error!("Unhandled packet {:?}_{:?}", action, family);
        }
    },

What I could instead do is just send a new kind of message from inside the init handler like so:

let (send, recv) = oneshot::channel();
let _ = tx.send(Command::GetSequenceBytes(send)).await;
let sequence_bytes = recv.await.expect("Player task has been killed");

// do rest like normal

I might even experiment with creating a “handle” struct around the player struct where I can write some helper functions that do all of the channel communications for you. I think the code would come out looking cleaner. That would look something more like this:

let sequence_bytes = match player_handle.get_sequence_bytes().await?;

// do rest like normal

Anyway - I’m very excited to try implementing this pattern in my server! Thanks to https://rustacean-station.org/ and Alice Ryhl for sharing this design pattern :)