들어가기에 앞서

본 포스팅에서는 블록체인보다는 푸시 알림 시스템에 대한 이야기가 주를 이루고 있고, 운영 중인 블록체인 서비스에 사용될 푸시 시스템을 개발한 경험을 공유하고자 합니다. 이해를 돕기 위한 간단한 코드들은 Scala로 작성되어 있습니다. 기존에 실시간 메시징을 유료로 사용할 수 있는 서비스로는 Firebase, SendBird, PubNub 등이 있습니다. 안정적인 서비스를 유료로 사용하여 개발과 운영 비용을 줄일 수 있었지만, 실시간 알림과 관련된 기술(Netty, Websocket, APNs, FCM 등)에 대해 전반적으로 이해하고 있었고 개인적으로 도전해보고 싶어서 직접 개발을 하게 되었습니다.


Pusher와 Blockchain

블록체인 기반의 서비스를 개발해보신 분이라면 아시겠지만, 요청에 대한 즉각적인 응답을 기대하는 HTTP 프로토콜과 달리, 블록체인 네트워크 상에서는 하나의 트랜잭션이 블록에 포함되기 까지는 길게 1시간이 넘게 소요되는 경우도 있습니다. 따라서 요청에 대한 응답을 비동기적으로 "" 처리해주는 것만으로도 서비스의 품질을 크게 개선할 수 있습니다.

현재 개발된 Pusher는 가상화폐를 담보로 현금을 대출해주는 블록체인 기반 서비스 Brick에 사용되고 있습니다. 대부업의 특성상 현재 대출건에 대한 알림, 청산에 대한 경고 등이 매우 중요했습니다. 현재 비트코인은 3 Block confirm, 이더리움은 6 Block confirm 기준으로 담보 입금이 완료되었다는 것을 정책으로 삼고 있는데, 여기에 소요되는 시간이 위에서 말씀드렸듯이 길게는 1시간이 걸릴 수 있습니다. 그래서 알림에 대한 중요성과 비동기적인 작업에 대한 처리를 잘 해줘야 된다는 것을 우선으로 설계할 필요가 있었습니다.


푸시 시스템

nPushKPusher(3rd party에 제공되기도 함)처럼 보통 각 회사마다 자사의 서비스에 사용될 글로벌 푸시 시스템을 가지고 있습니다. 이러한 푸시 시스템이 필요한 이유는 nPush에도 잘 소개되어 있듯이, 유저가 원하는 정보를 Pull(가져오기)하는 것과 달리, 유저가 원하든 원치 않든 정보를 Push(내려주기)해야 할 경우가 있기 때문입니다. 웹소켓 기반의 실시간 메시징을 지원하는 애플리케이션들이 대부분 그렇겠지만 카카오톡을 예로 들어보겠습니다. 애플리케이션은 서버와 연결된 채널이 존재하면 해당 채널을 통해 데이터를 받고, 연결된 채널이 존재하지 않으면 외부 푸시 시스템(APNs, FCM, GCM 등)을 통해 데이터를 전달받습니다. 데이터를 제공하는 서버 입장에서도 마찬가지입니다. 클라이언트와 연결된 채널이 존재하면 해당 데이터를 채널로 전송하고, 그렇지 않으면 외부 푸시 시스템을 통해 전달되도록 요청을 보냅니다.


Pusher란?

이러한 Pusher를 개발하기 전에 어떤 요구 사항이 있을지 생각해보았습니다.

  1. SessionManager: Pusher는 클라이언트와 연결된 채널(Session)을 관리하고 있어야 한다.
  2. Identity: Pusher는 해당 메시지를 어떤 클라이언트에게 전송할지 구분할 수 있어야 한다.
  3. Message: Pusher는 푸시 알림 메시지 뿐만 아니라 카카오톡 알림톡 메시지도 지원해야 한다.
  4. Clustering: Pusher가 여러 대의 서버(Node)로 운영될 경우, 노드 간에 클러스터링이 가능해야 한다.
  5. Durable: Pusher는 클라이언트와 연결이 끊겼을때 메시지 손실을 최소화 할 방안을 가지고 있어야 한다.
  6. SDK: 추가로, 클라이언트에서 사용할 SDK를 제공해주면 좋을것 같다.

Pusher Diagram

Pusher Diagram

1. SessionManager

클라이언트에게 메시지를 보내야할 때 외부 시스템에 의존하지 않고 연결된 채널로 메시지를 전송하기 위해 클라이언트와의 커넥션을 관리할 녀석이 필요했습니다. SessionManager는 클라이언트의 모든 세션을 갖고 있지 않고 현재 서버와 커넥션이 하나라도 있는 세션만 갖고 있습니다.

trait SessionManager {
    val sessions = new ConcurrentHashMap[Identity, SessionInfo]()
    val connections = new ConcurrentHashMap[Identity, ClientChannel]()

    def activeSessionsOf(identity: Identity): Set[SessionInfo]
    def connectionsOfSession(identity: Identity): Seq[ClientChannel]
    def addActiveSession(identity: Identity, channel: ClientChannel, validUntil: Timestamp): SessionInfo
    def removeChannel(channel: ClientChannel): Unit
    def closeIdentity(identity: Identity): Unit
    def forEachIdentity(func: Identity => Unit): Unit
}
Pusher SessionManager

2. Identity

Pusher는 메시지를 보내기 위해 직접 유저 정보를 찾지 않고, Pusher와 연결된 클라이언트(채팅 앱의 경우 채팅 서버 뿐만 아니라 모바일 클라이언트도 포함)로부터 메시지를 보낼 대상을 함께 전달 받습니다. Identity 는 appId를 가지고 있는데, 이 값은 해당 유저가 어떤 앱의 유저인지를 구분하는 역할을 합니다.

// Pusher Identity
sealed trait Identity {
    val appId: String
    def toJson: JObject
}

case class TokenIdentity(appId: String, token: String) extends Identity {
  override def toJson: JObject = ("appId" -> appId) ~ ("token" -> token)
}
case class UserIdentity(appId: String, userId: UserId) extends Identity {
  override def toJson: JObject = ("appId" -> appId) ~ ("userId" -> userId.toJson)
}
case class PhoneIdentity(appId: String, phoneNumber: PhoneNumber) extends Identity {
  override def toJson: JObject = ("appId" -> appId) ~ ("phone" -> phoneNumber.normalized)
}
Pusher Identity
  • TokenIdentity: 푸시 알림에 사용될 디바이스 토큰(DeviceToken) 값을 가진 Identity. 클라이언트와 연결된 채널이 존재하지 않는 경우 외부 푸시 시스템에 사용됨.
  • UserIdentity: 앱의 특정 유저 ID 값을 가진 Identity. 클라이언트와 연결된 채널이 존재하는 사용됨.
  • PhoneIdentity: 유저의 전화번호 정보를 가진 Identity. 카카오톡 알림톡 전송에 사용됨.

3. Message

푸시 알림 메시지 뿐만 아니라 카카오톡 알림톡 메시지도 지원하기 위해 간단한 메시지 모델을 만들어 보자면 다음과 같습니다.

object MsgType extends Enumeration {
    val FCM: MsgType.Value = Value(1)
    val APNs: MsgType.Value = Value(2)
    val ALIMTALK: MsgType.Value = Value(2)
}

sealed trait Message {
    val msgType: MsgType.Value

    def toJson: JObject
}

case class FcmPushMessage(
    identity: TokenIdentity,
    androidNotification: AndroidNotification,
    dataOpt: Option[Map[String, String]]
) extends Message {
    override val msgType: MsgType.Value = MsgType.FCM

    override def toJson: JObject =
        ("to" -> identity.toJson) ~
        ("notification" -> androidNotification.toJson) ~
        ("data" -> dataOpt)
}

case class ApnsPushMessage(
    apsAlert: ApsAlert,
    badgeOpt: Option[String],
    soundOpt: Option[String],
    dataOpt: Option[Map[String, String]]
) extends Message {
    override val msgType: MsgType.Value = MsgType.APNs

    override def toJson: JObject =
        ("aps" ->
            ("alert" -> apsAlert.toJson) ~
            ("badge" -> badgeOpt) ~
            ("sound" -> soundOpt)) ~
        ("data" -> dataOpt)
}

case class AlimtalkMessage(
    title: String,
    body: String,
    code: String,
    button: String
) extends Message {
    override val msgType: MsgType.Value = MsgType.ALIMTALK

    override def toJson: JObject =
        ("title" -> title) ~
        ("body" -> body) ~
        ("code" -> code) ~
        ("button" -> button)
}

case class AndroidNotification(
    title: String,
    body: String,
    badgeOpt: Option[String],
    soundOpt: Option[String],
    clickActionOpt: Option[String]
) {
    def toJson: JObject =
        ("title" -> title) ~
        ("body" -> body) ~
        ("badge" -> badgeOpt) ~
        ("sound" -> soundOpt) ~
        ("click_action" -> clickActionOpt)
}

case class ApsAlert(title: String, body: String) {
    def toJson: JObject = ("title" -> title) ~ ("body" -> body)
}
Pusher Message

4. Clustering

Pusher는 클라이언트를 다수의 노드에 골고루 분산시키기 위해 Consistent Hashing을 바탕으로 클라이언트가 연결할 노드의 번호를 선택합니다. Consistent Hashing은 다수의 노드로 이루어진 데이터를 저장할 때 데이터의 키를 기반으로 하여 어떤 노드에 저장할 것인가를 정하는 기술입니다.

Consistent Hashing

Consistent Hashing

클라이언트는 각 노드에 무작위로 분포되기 때문에 클라이언트의 입장에서는 분포가 균일하지 않을 수 있습니다. 이러한 문제를 해결하기 위해 각 노드의 주소를 가상 노드(Virtual node)를 링마다 배치하여 클라이언트가 더 균일하게 분포되도록 개선하였습니다.

object NodesHash {
    def keysFromNodeInfo(serverAddress: String): Seq[Int] = {
        val md5 = MessageDigest.getInstance("MD5")
        (0 until 64) flatMap { idx =>
            md5.reset()
            val hash = md5.digest(s"$ServerAddress-$idx".getBytes)
            (0 until 16 by 4) map { p =>
                def h(p: Int) = hash(p).toInt & 0xff
                Math.abs((h(p) << 24) | (h(p + 1) << 16) | (h(p + 2) << 8) | h(p + 3))
            }
        }
    }
}

위의 static 함수는 특정 주소에 대해서 항상 같은 값(256-bit Integers)의 가상 노드 256개를 생성하는데 사용됩니다.

scala> NodesHash.keysFromNodeInfo("192.168.0.1")
Vector(1613828804, 805888401, 1242242519, 551244407, 393482942, 1554231298, 1858188109, 1908435250, 660653046, 517057465, 904840963, 1476753659, 342604970, 515299240, 1288222429, 779656827, 2037399559, 1427420624, 177689550, 1892869121, 1517962223, 354243803, 304206007, 1957923660, 140770369, 760543721, 650264596, 1593120287, 1355313611, 962198633, 1004129914, 70477332, 2031925189, 723110065, 672204833, 947224700, 274888432, 1423223415, 1420168683, 85346857, 922546808, 1076865946, 122267179, 1913857477, 1555275447, 810469141, 640917472, 934794078, 1810234907, 1562144075, 18692565, 2117204431, 357504428, 1246787302, 350793204, 1908806167, 1970311178, 959447624, 707502366, 1094925119, 365776523, 496088576, 2051046536, 1364362178, 1977503229, 1042715249, 1955188796, 262565098, 861039408, 1763975204, 1684449900, 476868029, 1557139094, 1768818077, 1257410157, 1311555967, 2011755274, 1594419479, 1307099968, 536965055, 1277818912, 51397584, 1139482275, 2102188327, 1100274602, 75259803, 1032978855, 2007734034, 1228108687, 1714750577, 1341681246, 1021786423, 955596225, 1245949559, 76680524, 1193046374, 246758679, 200673478, 980763423, 1812328510, 438768596, 781744344, 3309151, 1555644692, 2018480450, 1671631621, 13394538, 1848291827, 1544671904, 699288205, 1547269108, 1661830641, 1120434381, 1599575767, 332561062, 1843220596, 1047282622, 572559546, 1743167194, 1351241939, 815458776, 1417568675, 1343005219, 137701485, 331167741, 1513564594, 97983972, 1392985810, 400112883, 1269103468, 173184182, 1722233430, 393017785, 1622242010, 2118965693, 1962593630, 981267341, 1811049954, 1841069162, 498743983, 253471631, 2052316130, 1775967946, 1538214396, 1876464562, 817865513, 1489190155, 1471206909, 820498304, 198786554, 515441683, 117266172, 1869298863, 1068853134, 836060665, 2016601691, 1317440967, 2131235450, 558919828, 1054833407, 696668126, 1952639747, 1551246507, 1125957890, 2098746237, 1674812526, 334826075, 559222942, 1469566133, 549285489, 133530737, 1148609705, 1836022993, 1607708207, 1171740179, 601021587, 1243444679, 356597326, 427834734, 352098878, 772217546, 149147634, 292207701, 633499874, 911994922, 602301400, 1834972875, 1488569339, 1770385813, 1463048607, 2095666593, 1665880366, 1486304109, 1994525154, 36382944, 104093429, 2011866619, 1132427057, 2048597077, 1730078752, 1482494370, 2112975991, 590764960, 1409136658, 1486699354, 1106788556, 497689019, 1350284219, 800565130, 2097534770, 1923284090, 837916073, 1875935334, 34289885, 487295685, 1683841472, 1561525968, 560648709, 1807621048, 157389896, 972261194, 1170739810, 407368554, 1978296563, 258024599, 1675683119, 1235620573, 2012928834, 1183544440, 1265599828, 1886705238, 620522618, 248828121, 1491055175, 123969986, 483018967, 1420757748, 1248835495, 12056062, 1242639703, 2135002342, 727649740, 1539556995, 660080873, 1926986892, 581446260, 1771061460, 582120956, 974952416, 1526406632, 858292446, 612970149, 837948651, 277877074, 253221013, 49730854)

5. Durable

만약 클라이언트가 Pusher와 연결이 끊기면 어떻게 될까? 메시지를 보내고 싶어도 못보내는 상황이 생길것 입니다. 그래서 보통 데이터가 유실될 수 있다고 보면 이를 방지하기 위해 데이터를 메모리에 저장하는 대신 디스크에 저장하는 방식으로 메시지 큐(Message Queue)를 사용할 수 있는데, 클라이언트에서 온 요청에 대한 응답이 실시간으로 필요한 경우에는 메시지 큐가 어울리지 않을 수 있습니다.

가장 많이 사용되고 있는 메시지 큐에는 Kafka와 RabbitMQ 등이 있는데, 기본적으로 Kafka는 순서가 중요하지 않은 대용량 스트리밍 메시지를 처리하기 위한 도구라서 순서 보장와 데이터 영속성이 중요할때 사용하는 RabbitMQ와는 용도에 차이가 있습니다. 하지만 Kafka의 토픽을 1로 하고 사용하면, 순서가 보장되므로 RabbitMQ처럼 사용할 수 있으니까 토픽 1로 사용하면 됩니다. 하지만 Kafka에 비해 RabbitMQ가 지원하는 기능이 훨씬 많기 때문에 처리할 메시지의 양이 엄청 크지 않다면 RabbitMQ를 사용하는게 개발하는 입장에서 좀 더 편할 수 있습니다.

RabbitMQ

RabbitMQ [1]

그리고 당장 Pusher가 사용될 애플리케이션의 특성을 봤을때 실시간 응답보다는 메시지의 영속성을 유지하는 것이 우선인 것으로 보였습니다. 그래서 Pusher와 클라이언트 간의 메시지 손실을 최소화하기 위해 연결이 끊긴 경우에는 RabbitMQ를 통해 메시지를 Pub/Sub 할 수 있도록 구현하였습니다.

6. SDK

Pusher의 클라이언트는 Android, iOS, 그리고 API 서버들일텐데.. 메시지를 보낼 API 서버에 Pusher의 클라이언트 코드를 작성하는 것도 제가 해야할 일 중 하나였습니다. 그래서 하는 김에 안드로이드에서도 사용 가능하도록 하나의 SDK를 만들면 좋을것 같다는 생각을 하였고, 개인적으로 요즘 제일 좋아하는 언어인 코틀린으로 SDK를 만들기 시작했습니다. 일단 SDK를 만들어 놓으면 제가 제일 먼저 사용하게 될 것이지만, 나 뿐만 아니라 나중에 SDK를 사용할 클라이언트 개발자들도 쉽게 사용할 수 있도록 만드는 것이 좋겠다는 생각을 했습니다. 아래는 SDK를 개발하면서 고려했던 사항들입니다.

// Request Model - Pusher SDK
abstract class Request[T] {
    abstract val action: String
    abstract def successResultOf(json: JObject): T

    def toJson: JObject
    def toJsonText: String = compact(render(toJson))
    def responseOf(json: JObject): Response[T] = {
        val resultCode: Int = (json \ "result").toOption match {
            case Some(JInt(v)) if v.isValidInt => v.toInt
            case Some(JString(v)) => v.toInt
            case _ => -1
        }

        resultCode match  {
            case 0 => Success(successResultOf(json))
            case _ => Failure(ServerErrorCode.from(resultCode), json)
        }
    }
}

// Response Model - Pusher SDK
sealed trait Response[T]

case class Success[T](result: T) extends Response[T]
case class Failure[T](errorCode: ServerErrorCode.Value, responseJson: JObject) extends Response[T]
case class ServerException(errorCode: ServerErrorCode.Value) extends Exception(s"Server Exception $errorCode")

object ServerErrorCode extends Enumeration {
    val InvalidJson: ServerErrorCode.Value = Value(101)
    val InvalidAction: ServerErrorCode.Value = Value(102)
    val MissingParams: ServerErrorCode.Value = Value(103)
    val InvalidParams: ServerErrorCode.Value = Value(104)
    val UnexpectedInternalError: ServerErrorCode.Value = Value(309)
    val UnexpectedExternalError: ServerErrorCode.Value = Value(509)
    val UnknownError: ServerErrorCode.Value = Value(999)

    def from(errorCode: Int): ServerErrorCode.Value =
        ServerErrorCode.values.find(_.id == errorCode).getOrElse(UnknownError)
}
  • 표준 예외 사용: Pusher에 정의된 Exception을 처리하는 것을 제외하고 나머지는 Java에서 사용되는 표준 예외를 사용해 SDK 사용자가 알기 쉽고 사용하기 편리하게 만들자.
  • 내부 캐시 정의: SDK 내부에 Storage를 구현하여 Initialize 단계에서 자신의 Identity를 등록, 메시지를 보낼 수 없는 상황이면 해당 메시지를 저장, 그리고 서버와의 변경 사항이 없다면 캐시된 데이터를 사용하도록 만들자.
  • Retry 로직 구현: Pusher와 연결이 끊긴 경우, Pusher와 재연결하기 위한 로직을 구현하자.
  • 메시지 영속성 유지: Pusher와 연결이 끊긴 경우, 해당 메시지는 Storage에 저장되고 RabbitMQ에 publish를 하거나 Pusher와 다시 연결이 되면 메시지를 해당 채널로 전송한다.
  • 버전 관리: 변경 사항이 적절히 적용되지 않으면 기능에 장애가 생길 수 있으므로 Nexus Repository를 통해 Pusher와 버전을 동기화하자.

끝으로

포스팅에서 예제 코드들은 스칼라로 작성하였는데 사실 Pusher는 자바로 구현하였습니다. 함수형 프로그래밍에 관심이 많아 스칼라로 무언가를 만드는 것에 익숙해져 있었는데, 생각해보니 자바로 어떤 프로그램을 처음부터 끝까지 만들어본 적이 오래된것 같아 자바를 선택하게 되었습니다. 개발하는 동안 ‘그냥 스칼라로 할 걸’ 하며 후회한 적이 많았지만, 이전에 공부했었던 이펙티브 자바를 다시 펼쳐 보면서 앱을 더 잘 만들기 위해 더 많은 노력을 했던것 같습니다.

푸시 알림 시스템을 개발할 당시에 아침에는 Pusher를 개발하고 오후에는 API 서버, 저녁에는 스마트 컨트랙트와 Pusher SDK를 개발하였습니다. 특별한 일이 없으면 주말에도 개발을 했고 하루에 코딩을 12시간 넘게 한적도 있는데 힘들다는 생각보다는 너무너무 재미있었습니다. 혼자서 Spring과 같은 정형화된 프레임워크의 도움없이 밑단부터 직접 개발해 볼 수 있는 기회가 많지 않을것 같은데… 정말 값진 경험이었다고 생각합니다.


Reference