프로그래밍 기초

Socket.IO 소개 3 - adapter

hs-archive 2023. 7. 28. 18:51

이전 글...

Socket.IO 소개 1 - server

Socket.IO 소개 2 - event

 

Adapter 소개

어댑터는 모든 클라이언트 또는 클라이언트 하위 집합에 이벤트를 브로드캐스팅하는 서버 측 구성 요소입니다. 여러 Socket.IO 서버로 확장할 때 기본 in-memory adapter를 다른 구현으로 교체해야 이벤트가 모든 클라이언트로 적절하게 라우팅됩니다. in-memory adapter 외에도 다섯 가지 공식 구현이 있습니다. 다음 다섯 가지 어댑터는 이 포스팅에서 배울 것입니다.

 

  - the Redis adapter

  - the Redis Streams adapter

  - the MongoDB adapter

  - the Postgres adapter

  - the Cluster adapter

 

커뮤니티에서 관리하는 몇 가지 다른 옵션도 있습니다.

  - AMQP (e,g,m RabbitMQ)

  - NATS

  - NATS

 

여러 Socket.IO 서버와 HTTP long-polling을 사용하는 경우 고정 세션을 활성화해야 합니다. 자세한 내용은 여기를 참조해 주세요.

 

API

// main namespace
const mainAdapter = io.of("/").adapter; // WARNING! io.adapter() will not work

// custom namespace
const adminAdapter = io.of("/admin").adapter;

위와 같이 어댑터 인스턴스에 액세스할 수 있습니다. socket.io@3.1.0부터 각 어댑터 인스턴스는 다음 이벤트를 emit합니다. 아래 join과 leave의 argument에 있는 id는 socketID를 뜻합니다.

 

  - create-room (argument: room): room을 만들 때 해당 이벤트가 emit됩니다.
  - delete-room (argument: room): room을 없앨 때 해당 이벤트가 emit됩니다.
  - join-room (argument: room, id): room에 join할 때 해당 이벤트가 emit됩니다.
  - leave-room (argument: room, id): room을 leave할 때 해당 이벤트가 emit됩니다.

 

example:

io.of("/").adapter.on("create-room", (room) => {
  console.log(`room ${room} was created`);
});

io.of("/").adapter.on("join-room", (room, id) => {
  console.log(`socket ${id} has joined room ${room}`);
});

 

Emitter

대부분의 어댑터 구현은 관련 emitter 패키지와 함께 제공되며, 이를 통해 다른 Node.js 프로세스에서 Socket.IO 서버 그룹과 통신할 수 있습니다. 이는 예를 들어 모든 클라이언트가 마이크로서비스 M1에 연결되고 마이크로서비스 M2가 emitter를 사용하여 패킷을 브로드캐스트(단방향 통신)하는 마이크로 서비스 설정에서 유용할 수 있습니다.

 

Emitter cheatsheet

// 모든 클라이언트에게 emit
emitter.emit(/* ... */);

// room1에 속한 모든 클라이언트에게 emit
emitter.to("room1").emit(/* ... */);

// "room2"를 제외한 "room1"의 모든 클라이언트에게 emit
emitter.to("room1").except("room2").emit(/* ... */);

const adminEmitter = emitter.of("/admin");

// "admin" 네임스페이스의 모든 클라이언트에게 emit
adminEmitter.emit(/* ... */);

// "admin" 네임스페이스의 "room1"에 속한 모든 클라이언트에게 emit
adminEmitter.to("room1").emit(/* ... */);
// socketsJoin()
// 모든 소켓 인스턴스들을 "room1"에 join 시킵니다.
emitter.socketsJoin("room1");

// "admin" 네임스페이스의 "room1" 방에 있는 모든 소켓 인스턴스들을 "room2"에 join 시킵니다.
emitter.of("/admin").in("room1").socketsJoin("room2");



// socketsLeave()
// 모든 소켓 인스턴스들을 "room1"을 나가도록 합니다.
emitter.socketsLeave("room1");

// "room1" 방의 모든 소켓 인스턴스가 "room2" 및 "room3" 방을 나가도록 합니다.
emitter.in("room1").socketsLeave(["room2", "room3"]);

// "admin" 네임스페이스의 "room1" 방에 있는 모든 소켓 인스턴스들을 "room2" 방에서 나가도록 합니다.
emitter.of("/admin").in("room1").socketsLeave("room2");



// disconnectSockets()
// 모든 소켓 인스턴스 연결 해제
emitter.disconnectSockets();

// "room1" 방의 모든 소켓 인스턴스를 연결 해제하고 저수준 연결을 버립니다.
emitter.in("room1").disconnectSockets(true);

// "admin" 네임스페이스의 "room1" 방에 있는 모든 소켓 인스턴스 연결을 끊습니다.
emitter.of("/admin").in("room1").disconnectSockets();

// 이것은 단일 소켓 ID로도 작동합니다.
emitter.of("/admin").in(theSocketId).disconnectSockets();



// serverSideEmit(): 클러스터의 모든 Socket.IO 서버에 이벤트를 emit합니다.
// 서버 측
emitter.serverSideEmit("hello", "world");

io.on("hello", (arg) => {
  console.log(arg); // prints "world"
});

emitter는 socket.io@4.0.0에 추가된 유틸리티 메서드(socketsJoin(), socketsLeave(), disconnectSockets(), serverSideEmit())도 지원합니다.

 

Redis adapter

Redis adapter

Redis adapter는 Redis Pub/Sub 매커니즘에 의존합니다. multiple 클라이언트(e.g., io.to("room1").emit() 또는 socket.broadcast.emit())에게 전송되는 모든 패킷은 다음 과정을 거칩니다.

 

  - 현재 서버에 연결된 일치하는 모든 클라이언트에게 전송

  - Redis 채널에 게시되고 클러스터의 다른 Socket.IO 서버에서 수신됨

 

Usage

import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { createClient } from "redis";

const io = new Server();

const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();

Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
  io.listen(3000);
});



// redis@3
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { createClient } from "redis";

const io = new Server();

const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));
io.listen(3000);

참고로 redis@3을 사용하면 Redis 클라이언트에서 connect()를 호출할 필요가 없습니다.

 

import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { Cluster } from "ioredis";

const io = new Server();

const pubClient = new Cluster([
  {
    host: "localhost",
    port: 6380,
  },
  {
    host: "localhost",
    port: 6381,
  },
]);

const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));
io.listen(3000);

ioredis를 사용하면 위와 같이 작성하면 됩니다.

 

Common questions

Redis 어댑터를 사용할 때 여전히 고정 세션을 활성화해야 합니까?

예, 그렇게 하지 않으면 HTTP 400 응답이 발생합니다. (you are reaching a server that is not aware of the Socket.IO session) 자세한 내용은 여기에서 확인할 수 있습니다.

 

Redis 서버가 다운되면 어떻게 됩니까?

Redis 서버와의 연결이 끊어진 경우 현재 서버에 연결된 클라이언트로만 패킷이 전송됩니다.

 

Emitter

Redis emitter

Redis emitter를 사용하면 다른 Node.js 프로세스에서 연결된 클라이언트로 패킷을 보낼 수 있습니다.

 

Usage

import { Emitter } from "@socket.io/redis-emitter";
import { createClient } from "redis";

const redisClient = createClient({ url: "redis://localhost:6379" });

redisClient.connect().then(() => {
  const emitter = new Emitter(redisClient);

  setInterval(() => {
    emitter.emit("time", new Date);
  }, 5000);
});



// redis@3
import { Emitter } from "@socket.io/redis-emitter";
import { createClient } from "redis";

const redisClient = createClient({ url: "redis://localhost:6379" });
const emitter = new Emitter(redisClient);

setInterval(() => {
  emitter.emit("time", new Date);
}, 5000);

redis@3을 사용하면 Redis 클라이언트에서 connect()를 호출할 필요가 없습니다.

 

Redis Streams adapter

이 어댑터는 Redis stream을 사용하여 Socket.IO 서버 간에 패킷을 전달합니다. 이전에 배웠던 Redis adapter(Redis Pub/Sub 매커니즘 사용)와의 주요 차이점은 이 어댑터는 Redis 서버에 대한 일시적인 연결 해제를 적절하게 처리하고 패킷 손실 없이 스트림을 재개한다는 것입니다.

 

Notes:

  - 모든 네임스페이스에 단일 스트림이 사용됩니다.

  - maxLen 옵션을 사용하면 스트림의 크기를 제한할 수 있습니다.

  - Redis Pub/Sub 매커니즘을 기반으로 하는 어댑터와 달리 이 어댑터는 Redis 서버에 대한 일시적인 연결 해재를 적절하게 처리하고 스트림을 재개합니다.

  - 연결 상태 복구가 활성화된 경우 세션은 Redis에 key/value 쌍으로 저장됩니다.

 

Usage

import { createClient } from "redis";
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-streams-adapter";

const redisClient = createClient({ host: "localhost", port: 6379 });

await redisClient.connect();

const io = new Server({
  adapter: createAdapter(redisClient)
});

io.listen(3000);

 

Common questions

Redis Stream adapter를 사용할 때 여전히 고정 세션을 활성화해야 합니까?

예, 그렇게 하지 않으면 HTTP 400 응답이 발생합니다.

 

Redis 서버가 다운되면 어떻게 됩니까?

기존 Redis adapter와 달리 Redis Stream adapter는 Redis 서버에 대한 일시적인 연결 끊김을 적절하게 처리하고 패킷 손실 없이 스트림을 재개합니다.

 

MongoDB adapter

MongoDB adapter

MongoDB adapter는 MongoDB의 Change Streams에 의존합니다. (따라서 복제본 세트 또는 분할된 클러스터가 필요합니다.) 여러 클라이언트(e.g., io.to("room1").emit() 또는 socket.broadcast.emit())에게 전송되는 모든 패킷은 아래 과정을 거칩니다.

 

  - 현재 서버에 연결된 일치하는 모든 클라이언트에게 전송

  - MongoDB capped collection에 삽입되고, 클러스터의 다른 Socket.IO 서버에서 수신됨

 

Usage

어댑터에서 생성된 MongoDB 문서를 cleam up하는 두 가지 방법이 있습니다. capped collection은 순환 버퍼 같은 것으로 오래된 것을 덮어 사용하는 방식이고 TTL은 말 그대로 일정 시간이 지나면 지워지는 것을 뜻합니다.

 

  - a capped collection

  - a TTL index

 

Usage with a capped collection

const { Server } = require("socket.io");
const { createAdapter } = require("@socket.io/mongo-adapter");
const { MongoClient } = require("mongodb");

const DB = "mydb";
const COLLECTION = "socket.io-adapter-events";

const io = new Server();

const mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0", {
  useUnifiedTopology: true,
});

const main = async () => {
  await mongoClient.connect();

  try {
    await mongoClient.db(DB).createCollection(COLLECTION, {
      capped: true,
      size: 1e6
    });
  } catch (e) {
    // collection already exists
  }
  const mongoCollection = mongoClient.db(DB).collection(COLLECTION);

  io.adapter(createAdapter(mongoCollection));
  io.listen(3000);
}

main();

 

Usage with a TTL index

const { Server } = require("socket.io");
const { createAdapter } = require("@socket.io/mongo-adapter");
const { MongoClient } = require("mongodb");

const DB = "mydb";
const COLLECTION = "socket.io-adapter-events";

const io = new Server();

const mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0", {
  useUnifiedTopology: true,
});

const main = async () => {
  await mongoClient.connect();

  const mongoCollection = mongoClient.db(DB).collection(COLLECTION);

  await mongoCollection.createIndex(
    { createdAt: 1 },
    { expireAfterSeconds: 3600, background: true }
  );

  io.adapter(createAdapter(mongoCollection, {
    addCreatedAtField: true
  }));
  io.listen(3000);
}

main();

 

Common questions

MongoDB 어댑터를 사용할 때 여전히 고정 세션을 활성화해야 합니까?

예, 그렇게 하지 않으면 HTTP 400 응답이 발생합니다.

 

MongoDB 클러스터가 다운되면 어떻게 됩니까?

MongoDB 클러스터에 대한 연결이 끊어진 경우 동작은 MongoDB 클라이언트의 bufferMaxEntries 옵션 값에 따라 달라집니다. 

 

  - 만약 value가 -1(기본값)이라면 재연결될 때까지 패킷이 버퍼링됩니다.

  - 만약 value가 0이면 패킷은 현재 서버에 연결된 클라이언트로만 전송됩니다.

 

Emitter

MongoDB emitter

MongoDB Emitter를 사용하면 다른 Node.js 프로세스와 연결된 클라이언트로 패킷을 보낼 수 있습니다.

 

Usage

const { Emitter } = require("@socket.io/mongo-emitter");
const { MongoClient } = require("mongodb");

const mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0", {
  useUnifiedTopology: true,
});

const main = async () => {
  await mongoClient.connect();

  const mongoCollection = mongoClient.db("mydb").collection("socket.io-adapter-events");
  const emitter = new Emitter(mongoCollection);

  setInterval(() => {
    emitter.emit("ping", new Date());
  }, 1000);
}

main();

 

Postgres adapter

Postgres adapter

Postgres adapter는 NOTIFYLISTEN에 의존합니다. 여러 클라이언트(e.g., io.to("room1").emit() 또는 socket.broadcast.emit())에게 전송되는 모든 패킷은 다음 과정을 거칩니다.

 

  - 현재 서버에 연결된 일치하는 모든 클라이언트에게 전송

  - 패킷에 이진 데이터가 포함되어 있거나 8000바이트 제한을 초과하는 경우 패킷은 다음 과정을 거칩니다.

        - msgpack으로 인코딩되어 보조 테이블에 삽입

        - row ID는 NOTIFY 명령 내에서 전송됩니다.

        - 이 row ID는 테이블을 쿼리하고 패킷을 디코딩한 다음 연결된 클라이언트 집합에 브로드캐스트하는 클러스터의 다른 Socket.IO 서버에서 수신합니다.

  - 패킷에 이진 데이터가 포함되어 있지 않고, 8000바이트 제한을 초과하지 않는 경우 단순히 NOTIFY 명령 내에서 전송되고 클러스터의 다른 Socket.IO 서버에서 수신됩니다.

 

Usage

const { Server } = require("socket.io");
const { createAdapter } = require("@socket.io/postgres-adapter");
const { Pool } = require("pg");

const io = new Server();

const pool = new Pool({
  user: "postgres",
  host: "localhost",
  database: "postgres",
  password: "changeit",
  port: 5432,
});

pool.query(`
  CREATE TABLE IF NOT EXISTS socket_io_attachments (
      id          bigserial UNIQUE,
      created_at  timestamptz DEFAULT NOW(),
      payload     bytea
  );
`);

io.adapter(createAdapter(pool));
io.listen(3000);

 

Common questions

 

Postgres 어댑터를 사용할 때 여전히 고정 세션을 활성화해야 합니까?

예, 그렇게 하지 않으면 HTTP 400 응답이 발생합니다.

 

Postgres 서버가 다운되면 어떻게 됩니까?

Postgres 서버와의 연결이 끊어진 경우 현재 서버에 연결된 클라이언트로만 패킷이 전송됩니다. 

 

Emitter

Postgres emitter

Postgres emitter를 사용하면 다른 Node.js 프로세스와 연결된 클라이언트로 패킷을 보낼 수 있습니다.

 

Usage

const { Emitter } = require("@socket.io/postgres-emitter");
const { Pool } = require("pg");

const pool = new Pool({
  user: "postgres",
  host: "localhost",
  database: "postgres",
  password: "changeit",
  port: 5432,
});

const emitter = new Emitter(pool);

setInterval(() => {
  emitter.emit("ping", new Date());
}, 1000);

 

Cluster adapter

Cluster adapter

Cluster adapter를 사용하면 Node.js cluster 내에서 Socket.IO를 사용할 수 있습니다. 여러 클라이언트(e.g., io.to("room1").emit() 또는 socket.broadcast.emit())에게 전송되는 모든 패킷은 IPC 채널을 통해 다른 workers에게도 전송됩니다.

 

Usage

With Node.js cluster

const cluster = require("cluster");
const http = require("http");
const { Server } = require("socket.io");
const numCPUs = require("os").cpus().length;
const { setupMaster, setupWorker } = require("@socket.io/sticky");
const { createAdapter, setupPrimary } = require("@socket.io/cluster-adapter");

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);

  const httpServer = http.createServer();

  // 고정 세션 설정
  setupMaster(httpServer, {
    loadBalancingMethod: "least-connection",
  });

  // workers 간의 연결 설정
  setupPrimary();

  // 버퍼를 포함하는 패킷에 필요
  // plain 텍스트 객체만 보내는 경우 무시할 수 있음
  // Node.js < 16.0.0
  cluster.setupMaster({
    serialization: "advanced",
  });
  // Node.js > 16.0.0
  // cluster.setupPrimary({
  //   serialization: "advanced",
  // });

  httpServer.listen(3000);

  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on("exit", (worker) => {
    console.log(`Worker ${worker.process.pid} died`);
    cluster.fork();
  });
} else {
  console.log(`Worker ${process.pid} started`);

  const httpServer = http.createServer();
  const io = new Server(httpServer);

  // use the cluster adapter
  io.adapter(createAdapter());

  // primary process와의 연결 설정
  setupWorker(io);

  io.on("connection", (socket) => {
    /* ... */
  });
}

 

With PM2

관련 문서를 참조해 주세요.

 

With recluster

// cluster.js
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const { setupPrimary } = require("@socket.io/cluster-adapter");
const recluster = require("recluster");
const path = require("path");

const httpServer = http.createServer();

// 고정 세션 setup
setupMaster(httpServer, {
  loadBalancingMethod: "least-connection",
});

// workers 간의 연결 setup
setupPrimary();

// 버퍼를 포함하는 패킷에 필요 
// (you can ignore it if you only send plaintext objects)
// Node.js < 16.0.0
cluster.setupMaster({
  serialization: "advanced",
});
// Node.js > 16.0.0
// cluster.setupPrimary({
//   serialization: "advanced",
// });

httpServer.listen(3000);

const balancer = recluster(path.join(__dirname, "worker.js"));

balancer.run();



// worker.js
const http = require("http");
const { Server } = require("socket.io");
const { setupWorker } = require("@socket.io/sticky");
const { createAdapter } = require("@socket.io/cluster-adapter");

const httpServer = http.createServer();
const io = new Server(httpServer);

// use the cluster adapter
io.adapter(createAdapter());

// primary process와의 연결 설정
setupWorker(io);

io.on("connection", (socket) => {
  /* ... */
});

 

 

 

 

 


https://socket.io/docs/v4/

 

Introduction | Socket.IO

What Socket.IO is

socket.io

'프로그래밍 기초' 카테고리의 다른 글

SOLID 원칙  (0) 2023.08.17
PostgreSQL vs MySQL  (0) 2023.08.12
Socket.IO 소개 2 - Event  (0) 2023.07.28
Socket.IO 소개 1 - Server  (0) 2023.07.27
CORS란 (Origin, SOP, CORS, CSP)  (0) 2023.07.19