Build Real-time notification API with Apollo Server and GraphQL subscriptions

Build Real-time notification API with Apollo Server and GraphQL subscriptions

Imagine receiving instant updates about new messages, friend requests, or activity notifications without refreshing the page. Real-time notifications are the key to keeping users engaged and informed in today's fast-paced digital landscape. Real-time notifications enhance engagement, satisfaction, and retention. Keep users hooked with interactive, dynamic experiences that leave a lasting impression.

We can add real-time updates to our graphql API using graphql subscriptions. In this article, we will focus on implementing Subscriptions type that helps in building real-time notification.

What are graphql subscriptions?

Subscriptions are a GraphQL feature that allows a server to send data to its clients when a specific event happens. Subscriptions are just part of your GraphQL contract, and they refer to events. To be able to send these events in real-time, you need to choose a transport that has support for that.
Because the server usually pushes subscription updates (instead of polling by the client), they generally use the WebSocket protocol instead of HTTP.

Setup graphql server

To set up a graphql server for subscription we have to use express middleware because it will run a web socket server and a graphql server. We already know how to set up a graphql server using standalone server configuration (here is the link to that article setup graphql server). So in this section, we only discuss the changes needed for express compatibility. The only thing that needs to change is our server.ts file.

Here is the GitHub link: https://github.com/icon-gaurav/mastering-graphql-with-nodejs/tree/graphql-subscriptions

You can clone this and follow along with my explanation.

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { createServer } from 'http';
import express from 'express';
import { makeExecutableSchema } from '@graphql-tools/schema';
import cors from 'cors';
import resolvers from './graphql/resolvers'
import typeDefs from "./graphql/schema";
import mongoose from "mongoose";

async function startServer(){
    const app = express();

    const httpServer = createServer(app);

    const schema = makeExecutableSchema({
        resolvers,
        typeDefs,
    });

    const apolloServer = new ApolloServer({
        schema,
        introspection:true,
    });

    await apolloServer.start();

    app.use('/graphql', cors<cors.CorsRequest>(), express.json(), expressMiddleware(apolloServer));

    const DATABASE_URL: string = `mongodb://localhost:27017/test`;
    mongoose.connect(DATABASE_URL)
        .then(() => {
            console.log('Database connected successfully')
        })
        .catch((e: any) => {
            console.log('Error connecting : ', e?.message)
        })

    const PORT = 4000;
// Now that our HTTP server is fully set up, we can listen to it.
    httpServer.listen(PORT, () => {
        console.log(`Server is now running on http://localhost:${PORT}/graphql`);
    });
}

startServer();

To start the server run this command npm run start:dev

The server will start at port 4000 using express middleware.

Define subscription in the schema

Schema's Subscription type defines the top-level fields that a user/client can subscribe to. Defining a Subscription type is as easy as defining the schema of type Query and Mutation.

# Subscription type definition in GraphQL Schema
type Subscription {
        postCreated: Post
    }

# Return type of the Subscription
type Post {
        _id: ID!
        title: String!
        content: String!
    }

The postCreated field will get updated whenever a new post is created in the backend and it will notify all clients with the newly created post who are subscribed to this event.

Enable subscriptions

To enable subscriptions we have to start the WebSocket server, here are the easy steps to start the GraphQL Websocket Server

  1. Install graphql-ws, ws, and @graphql-tools/schema

     npm install --save graphql-ws ws @graphql-tools/schema
    
  2. Create a WebsocketServer instance to use as a subscription server

     import { WebSocketServer } from 'ws';
     import { useServer } from 'graphql-ws/lib/use/ws';
    
     // Creating the WebSocket server
         const wsServer = new WebSocketServer({
             // This is the `httpServer` we created in a previous step.
             server: httpServer,
             // Pass a different path here if app.use
             // serves expressMiddleware at a different path
             path: '/subscriptions',
         });
    
  3. Add a plugin to the ApolloServer constructor to clean WebSocketServer and HTTP server, it helps in the proper shutdown of all the servers running on the machine.

    
         const serverCleanup = useServer({ schema }, wsServer);
    
         const apolloServer = new ApolloServer({
             schema,
             introspection:true,
             plugins: [
                 // Proper shutdown for the HTTP server.
                 ApolloServerPluginDrainHttpServer({ httpServer }),
    
                 // Proper shutdown for the WebSocket server.
                 {
                     async serverWillStart() {
                         return {
                             async drainServer() {
                                 await serverCleanup.dispose();
                             },
                         };
                     },
                 },
             ],
         });
    

Here is the fully configured server.ts file.

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { createServer } from 'http';
import express from 'express';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import cors from 'cors';
import { useServer } from 'graphql-ws/lib/use/ws';
import resolvers from './graphql/resolvers'
import typeDefs from "./graphql/schema";
import mongoose from "mongoose";

async function startServer(){
    const app = express();

    const httpServer = createServer(app);

    const schema = makeExecutableSchema({
        resolvers,
        typeDefs,
    });

    // Creating the WebSocket server
    const wsServer = new WebSocketServer({
        // This is the `httpServer` we created in a previous step.
        server: httpServer,
        // Pass a different path here if app.use
        // serves expressMiddleware at a different path
        path: '/subscriptions',
    });

// Hand in the schema we just created and have the
// WebSocketServer start listening.
    const serverCleanup = useServer({ schema }, wsServer);

    const apolloServer = new ApolloServer({
        schema,
        introspection:true,
        plugins: [
            // Proper shutdown for the HTTP server.
            ApolloServerPluginDrainHttpServer({ httpServer }),

            // Proper shutdown for the WebSocket server.
            {
                async serverWillStart() {
                    return {
                        async drainServer() {
                            await serverCleanup.dispose();
                        },
                    };
                },
            },
        ],
    });

    await apolloServer.start();

    app.use('/graphql', cors<cors.CorsRequest>(), express.json(), expressMiddleware(apolloServer));

    const DATABASE_URL: string = `mongodb://localhost:27017/test`
    mongoose.connect(DATABASE_URL)
        .then(() => {
            console.log('Database connected successfully')
        })
        .catch((e: any) => {
            console.log('Error connecting : ', e?.message)
        })

    const PORT = 4000;
// Now that our HTTP server is fully set up, we can listen to it.
    httpServer.listen(PORT, () => {
        console.log(`Server is now running on http://localhost:${PORT}/graphql`);
    });
}

startServer();

Implement the resolvers for subscription

Resolvers for subscriptions are different from Mutation and Query types, as subscription field resolvers are objects that define a subscribe function. Let's implement a resolver for postCreated subscription. Here is how we define the resolver for the subscription type

Subscription:{
        postCreated: {
            subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
        },
    }
  1. The subscribe function must return an object of type AsyncIterator, a standard interface for iterating over asynchronous results.

  2. AsyncIterator is generated by pubsub.asyncIterator

  3. pubsub.asyncIterator object listens for events that are associated with a label and adds them to a queue to process them

Publish an event

This step involves publishing an event for which we need a real-time notification. This generally happens when there is a new database entry or some updates from the admin side.

In this example, we will publish an "POST_CREATED" event whenever a new post is created using createPost mutation.

createPost: async (_parent: any, args: any, _context: any) => {
            // create new post document
            const post = new Post(args);
            //save post document and return the saved document
            await post.save();
            await pubsub.publish('POST_CREATED', {postCreated: post});
            return post;
        }

pubsub.publish function do the publishing of the event along with the payload of this event which in this case is post data.

Here is the full example of all the resolvers that is needed

import {posts, users, users_posts} from "../utils/data";
import Post from "../models/Post";
import User from "../models/User";
import bcrypt from 'bcrypt';
import jwt from 'jsonwebtoken';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();


const resolvers = {
    Query: {
        posts: async (_parent: any, _args: any, context: any) => {
            return Post.find();
        },
    },
    Mutation: {
        // Mutation that will create a new post and save in DB
        createPost: async (_parent: any, args: any, _context: any) => {
            // create new post document
            const post = new Post(args);
            //save post document and return the saved document
            await post.save();
            // publish the "POST_CREATED" event
            await pubsub.publish('POST_CREATED', {postCreated: post});
            return post;
        }
    },
    Subscription:{
        postCreated: {
            // resolving subscribe function
            subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
        },
    }
}

export default resolvers;

Now the server is ready, we can test our subscription.

Test our API

Run the server using npm run start:dev command

Here is the output of this server in which one window defines how we create a post and in another window, we listen to the subscription and see the data updated there. In the same way, we can use this system to build a full-fledged notification system, including authentication, authorization and filtering.

GraphQL subscriptions offer a powerful solution for real-time data updates in web applications. By leveraging subscriptions, developers can enhance user experience by providing instant updates without the need for constant polling. Implementing GraphQL subscriptions is quite fun and I believe we enjoyed it.

Here is the GitHub link: https://github.com/icon-gaurav/mastering-graphql-with-nodejs/tree/graphql-subscriptions

Note: It is not recommended to use the PubSub class for production as it uses an in-memory event publishing system, instead we can use an external datastore-based library according to the use cases.

You can check the library list and further info here: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

Did you find this article valuable?

Support Gaurav Kumar by becoming a sponsor. Any amount is appreciated!