GraphQL subscription server with Socket.IO, backed by Redis Pub/Sub
const redisClient = redis.createClient(REDIS_URL);
const listeners = Object.create(null);
function addListener(channel, listener) {
if (!listeners[channel]) {
listeners[channel] = [];
redisClient.subscribe(channel);
}
listeners[channel].push(listener);
return () => {
listeners[channel] = listeners[channel].filter(
item => item !== listener
);
if (!listeners[channel].length) {
redisClient.unsubscribe(channel);
delete listeners[channel];
}
};
}
redisClient.on('message', (channel, message) => {
const channelListeners = listeners[channel];
if (!channelListeners) {
return;
}
let body;
try {
body = JSON.parse(message);
} catch (e) {
console.error(`malformed message on ${channel}: ${message}`);
}
if (!body) {
return;
}
const data = getResponseData(body);
channelListeners.forEach((listener) => {
listener(data);
});
});
const io = new IoServer(server, {
serveClient: false,
});
io.on('connection', (socket) => {
// Turn the request into an Express request for consistency.
const req = Object.create((express: any).request);
Object.assign(req, socket.request, { app });
let authorization = null;
let credentials = null;
socket.on('authenticate', (token) => {
authorization = `Bearer ${token}`;
try {
const credentials = getCredentials(token);
} catch (e) {
credentials = null;
socket.emit('error', { code: 'invalid_token' });
}
});
const unsubscribeMap = Object.create(null);
socket.on('subscribe', async ({ id, query, variables }) => {
if (unsubscribeMap[id]) {
socket.emit('error', {
code: 'invalid_id.duplicate',
detail: id,
});
return;
}
function subscribe(channel) {
unsubscribeMap[id] = addListener(channel, async (body) => {
if (!authorizeRead(body, credentials)) {
// User is not authorized to view this update.
return;
}
const result = await graphql(
schema,
query,
body,
createContext(req, authorization),
variables,
);
socket.emit('subscription update', { id, ...result });
});
}
const result = await graphqlSubscribe({
schema,
query,
variables,
context: { subscribe },
});
if (result.errors) {
socket.emit('error', {
code: 'subscribe_failed',
detail: result.errors,
});
}
});
socket.on('unsubscribe', (id) => {
const unsubscribe = unsubscribeMap[id];
if (!unsubscribe) {
return;
}
unsubscribe();
delete unsubscribeMap[id];
});
socket.on('disconnect', () => {
Object.keys(unsubscribeMap).forEach((id) => {
unsubscribeMap[id]();
});
});
});