peterschussheim
2/23/2017 - 11:25 PM

GraphQL subscription server with Socket.IO, backed by Redis Pub/Sub

GraphQL subscription server with Socket.IO, backed by Redis Pub/Sub

const redisClient = redis.createClient(REDIS_URL);
const listeners = Object.create(null);

function addListener(channel, listener) {
  if (!listeners[channel]) {
    listeners[channel] = [];
    redisClient.subscribe(channel);
  }

  listeners[channel].push(listener);

  return () => {
    listeners[channel] = listeners[channel].filter(
      item => item !== listener
    );

    if (!listeners[channel].length) {
      redisClient.unsubscribe(channel);
      delete listeners[channel];
    }
  };
}

redisClient.on('message', (channel, message) => {
  const channelListeners = listeners[channel];
  if (!channelListeners) {
    return;
  }

  let body;
  try {
    body = JSON.parse(message);
  } catch (e) {
    console.error(`malformed message on ${channel}: ${message}`);
  }

  if (!body) {
    return;
  }

  const data = getResponseData(body);

  channelListeners.forEach((listener) => {
    listener(data);
  });
});

const io = new IoServer(server, {
  serveClient: false,
});

io.on('connection', (socket) => {
  // Turn the request into an Express request for consistency.
  const req = Object.create((express: any).request);
  Object.assign(req, socket.request, { app });

  let authorization = null;
  let credentials = null;

  socket.on('authenticate', (token) => {
    authorization = `Bearer ${token}`;

    try {
      const credentials = getCredentials(token);
    } catch (e) {
      credentials = null;

      socket.emit('error', { code: 'invalid_token' });
    }
  });

  const unsubscribeMap = Object.create(null);

  socket.on('subscribe', async ({ id, query, variables }) => {
    if (unsubscribeMap[id]) {
      socket.emit('error', {
        code: 'invalid_id.duplicate',
        detail: id,
      });
      return;
    }

    function subscribe(channel) {
      unsubscribeMap[id] = addListener(channel, async (body) => {
        if (!authorizeRead(body, credentials)) {
          // User is not authorized to view this update.
          return;
        }

        const result = await graphql(
          schema,
          query,
          body,
          createContext(req, authorization),
          variables,
        );

        socket.emit('subscription update', { id, ...result });
      });
    }

    const result = await graphqlSubscribe({
      schema,
      query,
      variables,
      context: { subscribe },
    });

    if (result.errors) {
      socket.emit('error', {
        code: 'subscribe_failed',
        detail: result.errors,
      });
    }
  });

  socket.on('unsubscribe', (id) => {
    const unsubscribe = unsubscribeMap[id];
    if (!unsubscribe) {
      return;
    }

    unsubscribe();
    delete unsubscribeMap[id];
  });

  socket.on('disconnect', () => {
    Object.keys(unsubscribeMap).forEach((id) => {
      unsubscribeMap[id]();
    });
  });
});