Stream Backpressure의 이해

Stream, Data Pipeline and Backpressure

Featured image

요즘 Youtube를 보다보면, 추천 알고리즘에 의해 하수구를 뚫는 영상을 보게된다.
추천을 해준 이유도 궁금하지만 어느순간 그걸 보고있는 내가 더 신기하다.

우리는 살면서 한번쯤은 액체가 역류하는 순간을 마주치게 된다.
화장실의 변기나 세면대, 주방의 싱크대 가끔 페트병에 쌀을 넣을 때도 넘지는 일을 마주친다.

왜 넘칠까? 바로 backpressure(배압) 때문에 넘치게 된다.

Backpressure(배압)은 무엇인가?

파이프를 통한 유체 흐름에 반하는 저항, 힘을 말한다. 액체나 증기가 관을 통해 배출 될 때,
유체가 흐르는 방향과 반대 방향으로 작용하는 저항 압력이다.

it is a resistance or force opposing the desired flow of fluid through pipes (wikipedia)
wikipedia - Back_pressure

흔히 ‘역압’ 이라고도 부른다. 배압이 생기는 이유는 마찰이나 수두압, 탱크의 내압등이 이유가 되며
이를 엔지니어들이 아래 그림과 같이 계산하여 설계한다.

내연기관 엔진 다루는 엔지니어들도 물론 이를 이용하여 설계 및 구현한다.
옛날 증기기관을 떠올리면 생각나는 압력 게이지도 역압측정에 활용된다.


소프트웨어랑 무슨상관인가?

소프트웨어 개발을 하면서 만나게 되는 용어들이 있다. Stream과 Pipeline이다.
리액티브프로그래밍은 데이터 스트림을 지원하기 위해 나온 개념이라고 소개하고 있다.

In computingreactive programming is a declarative programming paradigm concerned with data streams  and the propagation of change.
- wikipedia - Reactive_programming

Kafka 또한 Steams Architecture를 제공하면서 스트림을 언급하고 있다.

AWS는 데이터 파이프라인 서비스 패키지를 판매하고 있다. 패키지 구성을 보면 clickstream이라고 또 스트림이 등장한다.

신기한것은 스트림과 파이프라인 모두 메타포(Metaphor)로 실제 세상에 있는 파이프(Pipe)와 그 흐름(Stream)으로 삼고 있다.
단지 그 내용물만 액체와 기체가 아닌 ‘데이터’ 일 뿐이다. 이런 스트림은 현실세계와 똑같이 Backpressure(배압)이 존재 한다.

Spark Streaming

Spark는 Kafka로 부터 데이터를 수신 받지만, 배압의 가능성이 있어서 이에 대한 옵션을 제공한다.

Streaming automatically figures out the rate limits and dynamically adjusts them if the processing conditions change. This backpressure can be enabled by setting the configuration parameter spark.streaming.backpressure.enabled to true.

Fluentbit

CNCF의 Log Processor 및 Forwarder인 Fluentbit 또한 Backpressure를 제공한다.

이외에도 다양한 Data Stream, Pipe 관련 기술에도 Backpressure가 존재한다.


무슨일이 생기나?

Backpresure가 이상적으로 잘 동작하는 환경이라면 모두가 일정한 처리양을 유지하며 진행한다.
아래 그림을 예로 들면 Fast Producer가 벽돌을 기가 막히게 Slow Consumer에게 전달한다.

그런데, 이름에서 유추 할 수 있듯이 일을 처리하는 Slow Consumer가 너무 일이 힘든 나머지 천천히 일을하다 블록이 머리 끝까지 쌓여버렸다.
그래서 인부는 도망쳐버렸다.

이렇게 재밌는(?) 사례로 살펴본 역압의 부수효과(Side-effect)현상은 소프트웨어의 환경에서는 CPU와 메모리, 그리고 데이터 Drop 현상이 발생한다. Network I/O, DISK I/O, Out of Memory 등을 겪을 수 있다.

더 구체적인 예를 보자. 아래와 같이 서버 세대가 있다. 데이터를 초당 10개씩 전달하는 서버가 있고, 이를 처리하는 서버가 초당 10개처리용 1대, 초당 5개처리용 1개가 있다.
여기서 문제는 메시지 처리속도가 차이가 난다는 점이다.

이런 상황에서 마지막 5/sec 서버는 Queue를 보유하고 있다. 만약 이 구조로 운영을 하다가 마주칠 Backpressure 상황은 다음과 같다.

  1. 마지막 서버의 Queue가 가득찼다.
  2. 더이상 서비스 하지 못함을 알리는 응답을 중간서버에게 전달한다.

이렇게 더이상 서비스 하지 못함을 받은 중간서버는 두가지 선택이 남았다.

중간서버가 Option1을 선택한다면 데이터를 전달한 맨 처음서버들에게도 유사한 방식의 선택을 강요 할 수 있다.

만약 강요받은 서버가 Docker Container 였다면, Kernel의 OOM Killer에 의해 강제로 죽임을 당해서 우리 서비스도 죽고, 우리도 일때문에 죽을수도 있다.(?)

On a high load environment with backpressure the risks of having high memory usage is the chance to get killed by the Kernel (OOM Killer). - https://docs.fluentbit.io/manual/administration/buffering-and-storage

심지어, Docker의 logging driver로 설정을 해두었다면, 해당 로그데이터가 flush 되지 않으면,
해당 컨테이너는 종료가 불가능하다.


해결방법은?

다행히도 내용물이 액체나 기체가 아니라서 베르누이 법칙이나, 터뷸런스 현상등은 마주칠일이 없다.

유체의 속도가 높은 곳에서는 압력이 낮고, 유체의 속도가 낮은 곳에서는 압력이 높다 - 베르누이원리

폭이 넓어지면 압력이 줄어들어 유체가 직선으로 흐르지 못하고 와류하는 현상 - 터뷸런스

Buffer

우선 Buffer를 고려한다. 위에서도 자연스레 Queue를 언급하였는데, 이 또한 Buffer구현에 사용된 데이터구조중 하나다.

컴퓨팅에서, 버퍼(buffer, 문화어: 완충기억기)는 데이터를 한 곳에서 다른 한 곳으로 전송하는 동안 일시적으로 그 데이터를 보관하는 메모리의 영역이다. 버퍼링(buffering)이란 버퍼를 활용하는 방식 또는 버퍼를 채우는 동작을 말한다. 다른 말로 ‘큐(Queue)’라고도 표현한다. - https://ko.wikipedia.org/wiki/버퍼_(컴퓨터_과학)

이 버퍼는 고정길이버퍼와 가변길이버퍼로 구현 할 수 있다. 고정길이버퍼는 현재 버퍼가 가득차면 신규 수신된 데이터를 거절한다. 가변길이버퍼는 추가 길이의 버퍼를 추가 생성 혹은 신규생성한다.
고정길이와 가변길이 모두 좋은 해결 방법이지만 각각의 문제점이 있다.

버퍼를 사용하여 어느정도 개선은 할 수 있지만 그 한계가 존재한다.
만약 버퍼를 사용하여 운영을 할 경우라면 종국에 데이터 파이프라인의 전체 구조에서 감당 가능한 데이터의 처리양을 파악하고, 이것이 추가 될 경우 어느 한 구간에서 데이터를 버리는 작업을 진행해야 한다.

Cloud Native Computing Foundation(CNCF)의 Log Fowarder인 Fluentbit 에서는 아래와 같이 구성 할 수 있다. 메시지를 보내는 주기는 1초로 하고, 각 Input에서의 메모리 버퍼 최대양을 1M으로 설정하고, 스토리지 버퍼는 5M으로 설정 할 수 있다. ( mem_buf_limit = 1M , storage.backlog.mem_limit=5M ) 해당 설정이 넘어가면 메시지는 버린다.

[SERVICE]
    flush                     1
    log_Level                 info
    storage.path              /var/log/flb-storage/
    storage.sync              normal
    storage.checksum          off
    storage.backlog.mem_limit 5M

[INPUT]
    name                      cpu
    storage.type              filesystem
    mem_buf_limit             1M

[OUTPUT]
    name                      stackdriver
    match                     *

Experiment

Fluentbit을 이용하여 버퍼를 통한 Backpressure 처리를 확인해보자.

log-gen

log-gen은 쉘스크립트로 별도로 도커라이징 작업이 필요하다.

#!/bin/sh

i=0
echo $1 $2 $3 $4

sleep $4

while [ $i -ne $2 ]
do
  j=0
  while [ $j -ne $1 ]
  do
      echo '['$j'/'$i']Lorem ipsum dolor sit amet, consectetur adipiscing elit. Phasellus sagittis sollicitudin purus, a se'
      j=$((j+1))
  done

  sleep $3
  i=$((i+1))
done

echo 'done'

fluentbit1

[SERVICE]
    Flush                     1
    Daemon                    off
    log_Level                 info

[INPUT]
    Name   forward
    Listen 0.0.0.0
    Port   24225
    Mem_Buf_Limit 10kb

[OUTPUT]
    Name forward
    Match *
    Tag fluentbit
    Host fluentbit
    Port 24224

fluentbit2

[SERVICE]
    Flush                     1
    Daemon                    off
    log_Level                 info

[INPUT]
    Name   forward
    Listen 0.0.0.0
    Port   24224
    Mem_Buf_Limit 10kb

[OUTPUT]
    Name file
    Match *
    Path /fluent-bit/log/test/

실험결과 예상보다 더 갯수가 적었다. 왜냐하면 Docker Log를 기반으로 테스트 환경을 구성하면서 발생한 추가 데이터들 때문이다.
아래와 같이 우리가 구성한 문자열 외에 컨테이너정보, 시간정보, 소스정보 등과 Json Syntax 등이 추가되었다. 거의 데이터의 크기가 3개 정도 추가되었다.

따라서 310byte기준으로 초당 약 30건 (10kb / 310b) 정도인 300건이 최대 건수이며, 이외 Network I/O, Memory I/O 등등 다양한 환경변수로 인해 총 190개의 로그가 저장되었다.

// Before 107 byte
[92/0]Lorem ipsum dolor sit amet, consectetur adipiscing elit. Phasellus sagittis sollicitudin purus, a se"

// After 307 byte
fluentbit: [1638110977.000000000, {"source":"stdout","log":"[92/0]Lorem ipsum dolor sit amet, consectetur adipiscing elit. Phasellus sagittis sollicitudin purus, a se","container_id":"63bcc691994e562dd1816fc05d4648a513e17ff25bb464f4cda3066f4e5f7eff","container_name":"/logging-pipe-testing-set_log-gen1_1"}]

이때, 가용가능한 범위에서 버퍼 늘리고, flush time을 조율하면 Drop 되는 데이터를 줄일 수 있다.


ReativeX

프로그래밍을 통해서 Backpressure처리도 가능하다. 특히 ReactiveX에서 또한 Backpressure에 관련한 기능을 구현하도록 명시하고 있다.
현재 RxGroovy, RxJava, RxJs, RxScala에서 지원을 하고 있다.

RxJava 를 살펴보면 아래의 기능을 제공하고 있다. (https://riptutorial.com/rx-java/example/9906/the-onbackpressurexxx-operators 참고)


Pull based backpressure

지금까지 위에서 다뤘던 내용들은 Push 기반의 Backpressure 처리방법이었다. 이 방식들은 데이터의 유실을 기저로 동작하고 있다.
Pull 기반의 구조 도 고민해 볼 수 있다. Subscriber가 자신이 현재 처리 가능한 범위에서 데이터를 받아가는 구조다.

Kafka 또한 해당 pull 방식을 이용하고 있다고 한다.

Kafka Streams leverages Kafka’s consumer client behind the scenes,
which works with a pull-based messaging model that allows downstream processors to control the pace at which incoming data records are being read.
- https://docs.confluent.io/platform/current/streams/architecture.html#backpressure


마치며

Backpressure는 Stream과 관련된 곳 어디서나 만날 수 있다. 본 글을 읽고 모든 부분을 해결하기를 원하지는 않는다. 다만, 더 좋은 코드를 고민 할 수 있도록 이해를 돕기 위해 작성 한 글이다.
추후 또 만나더라도 당황하는 일은 적었으면 한다. Backpressure로 인한 Side-effect를 없애기는 어려울 것으로 생각한다. 문제 상황이 각기 다르기에 최선의 답이 있을 뿐이다.
정답은 없다. 다만 경험이 쌓여서 누군가에게 더 좋은 해결방법을 제안 할 수 있는 개발자가 되자.


참고