(WebSocket/AWS) WebSocket API – Lambda 함수 작성



① 기본 구성

2023.04.06 – (Cloud/Public Cloud) – (AWS) API Gateway: WebSocket API 사용

(AWS) API 게이트웨이: WebSocket API 사용

안녕하세요~ 저는 지금 매우 들뜬 상태입니다~ 왜냐하면.. AWS WebSocket API를 사용하고 싶어서 ㅠㅠㅠ ERROR: 예기치 않은 서버 응답: 500이 계속 올라옵니다.

alsrudalsrudalsrud.

» 변경: DynamoDB 대신 RDS 연결

② 람다 코드 작성

※ 임포트할 모듈을 zip 파일로 업로드해야 합니다.

연결

const AWS = require('aws-sdk');
const mysql = require('mysql2'); # Promise 반환을 지원하는 mysql2 라이브러리로 변경

// Connection RDS MySQL
const connection = mysql.createConnection({
  host: 'RDS endpoint',
  port: '3306',
  user: 'admin',
  password: 'qwer1234',
  database: 'rds01',
});

exports.handler = async function (event, context) {
  const connectionId = event.requestContext.connectionId;
  const roomName = parseInt(event.queryStringParameters.roomName);
  
  // ApiGatewayManagementApi
  const callbackAPI = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
  });
  
  // roomName, connectionId 저장 : class ConnectionId
  try {
    // insert data
    let params = {
      roomName : roomName,
      cid : connectionId,
    };
    
    // insert data into table
    const save_sql="insert into chat01_connectionid set ?";
    await connection.promise().query(save_sql, params);
  } catch (error) {
    console.error(`error:: ${error}`); # 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있을 시 오류날 확률이 매우 높음

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }

  return {
    statusCode: 200,
  };
};

같은 대화방 입장: roomName 사용

문자 보내

const AWS = require('aws-sdk');
const mysql = require('mysql2'); # Promise 반환을 지원하는 mysql2 라이브러리로 변경

// Connection RDS MySQL
let connection = mysql.createConnection({
  host: 'RDS endpoint',
  port: '3306',
  user: 'admin',
  password: 'qwer1234',
  database: 'rds01',
});

exports.handler = async function (event, context) {
  console.log('메세지 이벤트 실행');

  // ApiGatewayManagementApi
  const callbackAPI = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
  });

  // Django에서 보낸 채팅 메세지
  let message = JSON.parse(event.body).message;
  const to_user = parseInt(JSON.parse(event.body).to_user);
  const from_user = parseInt(JSON.parse(event.body).from_user);
  const room_id = parseInt(JSON.parse(event.body).room_id);
  
  // 원래 메세지 DB 에서 불러오기
  let sql="select cid from chat01_connectionid where roomName=" + room_id;
  const return_sql="select message from chat01_chatting where room_id =" + room_id;
  try {
    // 저장되어 있던 DB array 저장
    const (results) = await connection.promise().query(return_sql);
    
    let return_arr = ();
    for (let i = 0; i < results.length; i++) {
      return_arr.push('0'+results(i)('message'));
    }
    console.log(return_arr);

    // 메세지 전송
    const (row, fields) = await connection.promise().query(sql);

    for (let j = 0; j < return_arr.length; j++) {
      const ReturnMessages = row.map(({ cid: ConnectionId }) => {
        return callbackAPI.postToConnection({ ConnectionId, Data: return_arr(j) }).promise();
      });
      console.log(return_arr(j));
      await Promise.all(ReturnMessages);
    }
  } catch (error) {
    console.error(`error:: ${error}`); # 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있을 시 오류날 확률이 매우 높음

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }
  
  // DB에 새로운 메세지 저장 : class Chatting
  try {
    // insert data
    let params = {
      room_id : room_id,
      message : message,
      to : to_user,
      from_user : from_user,
    };
    
    // insert data into table
    let insert_sql="insert into chat01_chatting set ?";
    await connection.promise().query(insert_sql, params);
  } catch (error) {
    console.error(`error:: ${error}`); // 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있어 오류날 확률이 매우 높으므로 DB 데이터 정리 후 실행해야할 듯

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }
  
  // 새로운 메세지 전송
  sql="select cid from chat01_connectionid where roomName=" + room_id;
  try {
    const (rows, fields) = await connection.promise().query(sql);
    
    message="1"+ message;
    // 기존 코드에서 오류가 있었던 이유는 DB 컬럼명이 cid인데, ConnectionId로 불러와 제대로 참조하지 못함.
    // cid 컬럼 변수 명을 ConnectionId로 변경
    const sendMessages = rows.map(({ cid: ConnectionId }) => {
      return callbackAPI.postToConnection({ ConnectionId, Data: message }).promise();
    });
    
    // Array.map 메소드는 동기함수이고, callbackAPI... 메소드는 비동기함수라 map 내부 콜백에 async-await을 사용해도 소용없음.
    // Promise.all로 비동기 함수가 모두 실행될 수 있도록 binding해주고, await으로 함수 실행을 보장시켜줘야함.
    const result = await Promise.all(sendMessages);
    console.log(result);
  } catch (error) {
    console.error(`error:: ${error}`); // 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있어 오류날 확률이 매우 높으므로 DB 데이터 정리 후 실행해야할 듯

    return {
      statusCode: 500,
      error_message: error.message,
    };
  } //finally {
    // 이 람다함수가 매우 자주 실행되는 경우 Connection을 끊지 않고 계속 사용하는 것도 고려
    //await connection.end();
  //}

  return { statusCode: 200 };
};

과거 채팅 검색

새 채팅이 저장되고 화면에 인쇄됩니다.

같은 대화방(roomName)이 있는 connectionId에게만 메시지 보내기

연결 해제

import json
import pymysql

def lambda_handler(event, context):
    connectionId = event('requestContext')('connectionId')
    conn = pymysql.connect(
        host="RDS endpoint",
        user="user01",
        password='qwer1234',
        db='rds01',
        charset="utf8"
    )
    cur = conn.cursor()
    sql = "DELETE FROM chat01_connectionid WHERE cid = '" + connectionId + "';";
    cur.execute(sql)
    conn.commit()
    #conn.close()
    
    # TODO implement
    return {
        'statusCode': 200,
    }

Node.js로 했습니다.

^^ 안되서 파이썬으로 바꿨습니다.

대화방과 연결이 끊긴 connectionId 삭제

③ Django 코드는 WebSocket API(Lambda)를 호출합니다.

<script type="text/javascript">
let roomName = "{{ room_id | escapejs }}";
let to = "{{ to | escapejs }}";

# websocket api url
let chatSocket = new WebSocket(
    `(websocket api 주소)?roomName=`+roomName
);

# websocket connect
chatSocket.onopen = (e) => {
    console.log(e);
};

# websocket 에서 전달받은 메세지 출력
let cnt = 0;
chatSocket.onmessage = (e) => {
    console.log(e.data);
    let message = e.data
    if (message(0) == '0' && cnt == 0) {	# 기존 메세지
        document.querySelector("#chat-log").value += (message.substring(1) + '\n');
    }
    if (message(0) == '1') {	# 새로운 메세지
        cnt = 1;
        document.querySelector("#chat-log").value += (message.substring(1) + '\n');
    }
};

# websocket disconnect
chatSocket.onclose = (e) => {
    console.error('Chat socket closed unexpectedly');
};

# enter 키로 메세지 전송
document.querySelector("#chat-message-input").focus();
document.querySelector("#chat-message-input").addEventListener("keyup",(e) => {
    if (e.keyCode === 13) {
        document.querySelector("#chat-message-submit").click();
    }
});
# send 버튼으로 메세지 전송
document.querySelector("#chat-message-submit").addEventListener("click", (e) => {
    let messageInputDom = document.querySelector("#chat-message-input");
    let message = messageInputDom.value;
    chatSocket.send(JSON.stringify({
        'action': 'sendmessage',
        'message' : '{{ request.session.user_id }} : ' + message,
        'to_user' : to,
        'from_user' : {{ request.session.user_uid }},
        'room_id' : roomName
    }));
    messageInputDom.value="";
});

</script>