Skip to content

How to handle websocket disconnect? #1066

@mantou132

Description

@mantou132

I'm a newbie using juniper, now, I have built a friend notification system with Subscription, when the user establishes a websocket connection, it means the user is online.

I want to change the user to offline when the websocket connection is down, what should I do?

#[graphql_subscription(context = Context)]
impl Subscription {
    async fn friend_sys(context: &Context) -> FriendSysStream {
        let conn = context.dbpool.get().unwrap();

        // TODO: offline
        update_user_status(&conn, context.user_id, ScUserStatus::Online);
        notify_ids(
            get_friend_ids(&conn, context.user_id),
            ScNotifyMessage::update_user(get_user_basic(&conn, context.user_id)),
        );

        let mut rx = get_receiver(context.user_id);
        let stream = async_stream::stream! {
            loop {
                let result = rx.recv().await.unwrap();
                yield Ok(result)
            }
        };
        Box::pin(stream)
    }
}
pub async fn subscriptions(
    req: HttpRequest,
    schema: web::Data<Schema>,
    pool: web::Data<Pool>,
    secret: web::Data<String>,
    stream: web::Payload,
) -> Result<HttpResponse, Error> {
    let schema = schema.into_inner();
    subscriptions_handler(req, stream, schema, |params: Variables| async move {
        let authorization = params
            .get("authorization")
            .unwrap_or(params.get("Authorization").unwrap_or(&InputValue::Null));
        let user = match authorization {
            InputValue::Scalar(DefaultScalarValue::String(auth_string)) => {
                UserToken::parse(&secret, extract_token_from_str(&auth_string))
            }
            _ => None,
        };
        let user_id = match user {
            Some(id) => id,
            None => return Err(error::ErrorUnauthorized("Unauthorized")),
        };
        let ctx = Context {
            user_id,
            dbpool: pool.get_ref().to_owned(),
        };
        let config = ConnectionConfig::new(ctx).with_keep_alive_interval(Duration::from_secs(15));
        Ok(config) as Result<ConnectionConfig<Context>, Error>
    })
    .await
}

Activity

ilslv

ilslv commented on May 17, 2022

@ilslv
Member

@mantou132 I see 2 different possibilities for implementing this feature:

  1. Using custom Stream with Drop impl, which will notify about subscription stopping and user going offline. The main problem with this approach is that you rely on Drop actually being called, which may not be the case. For example in case of a server crash. Also Drop by itself doesn't allow async code, so you'll have to hack around this (maybe by communicating with detached spawned task via channels).
  2. More reliable approach would be heartbeats. While subscription is open, you update some persistent storage every 1 second for example. Background cron process tracks stale heartbeats which weren't updated in 3 seconds and considers those users offline. So even in case of a server crash other instance will do the cleanup. This approach can be combined with the previous one to get faster response time, instead of only cron job, it can also react to Dropped Subscriptions.
mantou132

mantou132 commented on May 17, 2022

@mantou132
Author

Thank you for your reply.

I plan to use heartbeat to implement this feature, it's really more reliable and simpler.

In my case, the websocket disconnect does not mean the user is offline, as the user may have multiple clients. Since I use tokio broadcast, the number of receivers can be used to determine if the user is offline

added this to the 0.16.0 milestone on May 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

    Development

    No branches or pull requests

      Participants

      @mantou132@tyranron@ilslv

      Issue actions

        How to handle websocket disconnect? · Issue #1066 · graphql-rust/juniper