API Gateway Throttling 구현

기본 원리와 구현 그리고 오픈소스를 통한 이해

Featured image

마이크로서비스, MSA, Micro Service Architecture. MSA(Micro Service Architecture) 불과 6년전만 하더라도 대한민국 개발생태계에서 외치지도 들리지도 않던 용어다. 그럼에도 2021년 11월 오늘에는 대부분의 개발자들이 한번쯤은 들어본 용어가 되어버렸다.

이번 글에서는 MSA의 장단점을 써 내려가지는 않을것이다. 다만, MSA 패턴의 하나의 역할로 자리잡고 있는 API Gateway에 대해서 대략적으로 살펴보고, 기능 중 하나인 API Throttling과 구현에 대해서 알아보려고 한다.

(다만, 실제 상용서비스에 적용하려는 분들은 수많은 훌륭한 서비스들이 있으니 직접 구현하는 것 보단 이용하는 쪽에 무게를 두시는게 좋을 것 같다. 그래도 이 글이 조금이나마 서비스 선택과 이해에 도움이 될 수 있길 기원한다.)


API Gateway?

API Gateway는 MSA의 패턴중의 하나다. 특정 상품이나 서비스를 한정하여 지칭하는 것은 아니다. API Client와 Server의 사이 구간에 위치하여 다양한 목적으로 사용되는 역할을 한다.

API Gateway 도입시 여러가지의 이점들이 많다.

  • 인증 및 권한 부여
  • 서비스 검색 통합
  • 응답 캐싱
  • 정책, 회로 차단기 및 QoS 다시 시도
  • 속도 제한
  • 부하 분산
  • 로깅, 추적, 상관 관계
  • 헤더, 쿼리 문자열 및 청구 변환
  • IP 허용 목록에 추가

API Throttling?

API 요청에 속도와 횟수를 제한하는 것을 말한다. Rate Limit 기능이다. API의 경우 사용자 직접 API Client Tool을 사용해서 요청을 하는 일은 적다. API 호출테스트를 할 때를 제외하고는 대부분이 자동화된 프로세스에 연동하여 사용하고자 한다.

특정 사용자가 만든 프로그램이 버그가 있어서 요청량이 폭증하면 어떻게될까? 아니면 악의적인 사용자가 DoS(Denial of service)와 같은 공격을 시도하면 어떻게 될까? 우리의 서비스 가용성(Availability)에 문제가 생긴다. 따라서 보안을 위해서도 Api Throttling은 필요하다.

모든 사용자는 정상적인 이용을 하고 있다고 하자. 이때 A 회사와 B회사 있다. A회사의 API 요청건이 B회사의 요청건을 훨씬 상회한다. 이때, A회사보다 B회사와의 계약금액이 훨씬 크다. 이 때 어떤 선택을 할 수있을까? Scale Up이나 Scale Out을 통해서 서버 자원을 API제공회사의 부담으로 확장을 시키는 방법이 있다.

그런데, API 제공회사에서 서버자원을 확장하는 비용이 API 서비스를 이용하는 회사들과 계약한 금액보다 높다. 이때는 어떤 선택을 할 것인가?

위와 같이 난처한 사항을 막기위해서라도 API를 개발할때는 API Throttling 을 도입하여, 프리세일즈나 테크세일즈 분들이 반드시 SLA(Service Level Agreement)에서 협의의 대상으로 삼을 수 있도록 도와야 한다.


API Throttling 구현

API Throttling의 구현은 일반적으로 BucketWindow를 컨셉으로 한 알고리즘이 사용된다.

본 글에서는 우선 가장 이해하기 쉬운 Bucket 알고리즘을 중심으로 구현을 해보고자 한다.


Leaky Bucket 개요

직역하면 ‘구멍난 양동이’. 우리 정서에는 ‘깨진독에 물 붓기’ 쯤 되겠다.

구멍의 크기만큼 물이 빠져나가고, 물을 넣는 속도가 물이 담기는 양이 빠져나가는 속도보다 빠르면 양동이가 넘친다.

Leaky Bucket 알고리즘 또한 비슷한 원리다. 고정된 버킷최대깊이(T)의 버킷에 네트워크 요청 유입속도의 한계값(τ)을 정한다. 그리고 지정된 속도에 맞춰서 일정하게 처리한다. 만약, 한계값을 초과하면 요청은 버린다.

API Gateway가 HTTP를 처리하는 상황에서는 아래와 같은 맥락으로 이해 할 수 있다.

고정된 API 요청 최대량(T)에 Http request의 한계값(τ)을 정한다. 그리고 지정된 속도에 맞춰서 일정하게 HTTP Request를 처리 및 각 서비스로 포워딩한다. 그리고 한계값을 초과하면 요청은 HTTP 429 Response를 반환한다.


Leaky Bucket 구현

5초당 2건씩 최대 100건을 처리 할 수 있는 아주 작은 API Gateway를 구현하자. (쉬운 검증을 위해 처리양은 제한된 크기로 늘려뒀다.)

우선 요청된 순서대로 처리를 해야한다. ADT로 Queue를 선택했다. 오.. 적절한 선택이었다. 알리바바에서도 guava 를 사용하고 있는데, 이미 아래와 같이 설명하고 있다.

우선 빠른 구현(글쓴이 기준)에 맞춰서 요즘 사용하는 Node.js로 쉽게 작성을 해보자. 그런데, 아마 이 글을 읽는 분들 대부분은 한국에서 개발자로 일하시며 스프링과 자바로 개발중이신분들이 많을 것 같다.

그래서 선택은 Nest.js로 개발해보자.. 이유는 간단하다. 일단 비슷하게 생겼다. (그리고 고양이가 있다. 예쁘다. 멋지다.)

DI(Dependency Injection)하는 느낌도 비슷하다.

구현 클래스는 Middleware에서 버킷 Task 작업에 참여토록 하였으며, 시간별 소비는 @Interval 모듈을 활용했다.

LeakyBucketMiddleware

import {
  ForbiddenException,
  Injectable,
  Logger,
  NestMiddleware,
} from '@nestjs/common';
import { NextFunction, request } from 'express';

@Injectable()
export class LeakyBucketMiddleware implements NestMiddleware {
  static readonly requestQueue = [];
  static readonly rate = { size: 2, interval: 5000 };
  readonly capacity = 100;
  private readonly logger = new Logger(LeakyBucketMiddleware.name);

  public readonly isFullBucket = () =>
    LeakyBucketMiddleware.requestQueue.length === this.capacity;

  private add(req: Request, res: Response, next: NextFunction): boolean {
    if (this.isFullBucket()) {
      return false;
    }
    LeakyBucketMiddleware.requestQueue.push({ req, res, next });
    return true;
  }

  use(req: Request, res: Response, next: NextFunction): any {
    if (this.add(req, res, next)) {
      this.logger.log(`${req.method} ${req.url} - ${new Date()}`);
    } else {
      throw new ForbiddenException();
    }
  }
}

TaskService

@Injectable()
export class TaskService {
  private readonly logger = new Logger(TaskService.name);
  private static readonlyrate= LeakyBucketMiddleware.rate;

  @Interval(TaskService.rate.interval)
  private consume() {
    this.logger.log('leak.. ' + LeakyBucketMiddleware.requestQueue.length);
    for (let i = 0; i < TaskService.rate.size; i++) {
      const job = LeakyBucketMiddleware.requestQueue.shift();
      job?.next();
    }
  }
}

테스트

미들웨어에서 처리이후, Contoller에서 받은 요청을 Http Client를 이용하여 전달해주면 될 것이다.

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  // @see : https://docs.nestjs.com/controllers#routing
  @All('*')
  getHello2(@Req() request: any): string {
    return 'HttpClient 처리 및 반환';
  }
}

장점:

단점:

의견

Usecase :

sample-code


Token Bucket 개요

어린시절 누구나 한번쯤(?) 시도 해봤을 법한 돼지저금통에서 동전빼서 오락실가서 게임하기와 비슷하다. *오락실을 모르면, 피시방으로…)

Bucket에 저장 할 수 있는 최대 토큰의 숫자를 정하고, 일정주기로 해당 토큰을 채운다.

요청마다 토큰을 1개씩 사용하며, 토큰이 없으면 요청 메시지를 반환한다.

API Gateway가 HTTP를 처리하는 상황에서는 아래와 같은 맥락으로 이해 할 수 있다.

가용가능한최대 범위의 토큰수용한계(T)에 대비하여 토큰최대발행량(τ)을 정한다. 그리고 일정 주기로 API Token을 채운다. 사용할 토큰이 없는 API Request에 대해서 HTTP 429 Response를 반환한다.


Token Bucket 구현

범용적으로 많이 활용되는 알고리즘이라서 별도의 구현보다는 사례 하나를 제시하고자 한다.

Spring Cloud에서 어떤 이유에서인지는 모르겠으나, Zuul 외 Spring Cloud Gateway를 공식으로 내세우고 있다spring-cloud-gateway

Spring Cloud Gateway에서도 Token Bucket 을 이용하고 있다.spring-cloud-gateway-RedisRateLimiter.java#L227

구현은 Reactor 를 이용하여 Async, non-blocking 기반으로 했다. 기반은 Redis와 함께 동작한다.

간혹 burst라는 용어가 보이는데, 이는 최대 처리가능한 범위로 이해하면 된다.

스프링에서 사용은 config로 이렇게 설정하면 된다

spring:
  cloud:
    gateway:
      routes:
        - id: route1
          uri: http://localhost:8081
          predicates:
            - Path=/backend
          filters:
          - name: RequestRateLimiter
            args:
              redis-rate-limiter.replenishRate: 500
              redis-rate-limiter.burstCapacity: 1000
              redis-rate-limiter.requestedTokens: 1

Rate Limit과 관련된 구조는 대략 아래와 같은 관계를 같고 있는 것으로 보인다.

RedisRateLimiter

public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware {
/**
	 * This uses a basic token bucket algorithm and relies on the fact that Redis scripts
	 * execute atomically. No other operations can run between fetching the count and
	 * writing the new count.
	 */
	@Override
	@SuppressWarnings("unchecked")
	public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		Config routeConfig = getConfig().get(routeId);

		if (routeConfig == null) {
			if (defaultConfig == null) {
				throw new IllegalArgumentException("No Configuration found for route " + routeId);
			}
			routeConfig = defaultConfig;
		}

		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();

		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();

		try {
			List<String> keys = getKeys(id);

			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
					Instant.now().getEpochSecond() + "", "1");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
					// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
					.reduce(new ArrayList<Long>(), (longs, l) -> {
						longs.addAll(l);
						return longs;
					}) .map(results -> {
						boolean allowed = results.get(0) == 1L;
						Long tokensLeft = results.get(1);

						Response response = new Response(allowed, tokensLeft);

						if (log.isDebugEnabled()) {
							log.debug("response: " + response);
						}
						return response;
					});
		}
		catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, -1));
	}

위 코드를 읽다보면 이제 설정한 burst와 처리량을 기반으로 Redis에서 Lua Script를 동작시키도록 하는 내용이 있다.

Flux<List<Long>> flux =
        this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);

해당하는 Lua Script의 정보는 GatewayRedisAutoConfiguration 에 나와있다.

GatewayRedisAutoConfiguration

여기서 late limiter 로직을 담고있는 스크립트 파일위치를 제공한다. 바로 META-INF/scripts/request_rate_limiter.lua 가 그것이다.

package org.springframework.cloud.gateway.config;

class GatewayRedisAutoConfiguration {

	@Bean
	@SuppressWarnings("unchecked")
	public RedisScript redisRequestRateLimiterScript() {
		DefaultRedisScript redisScript = new DefaultRedisScript<>();
		redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
		redisScript.setResultType(List.class);
		return redisScript;
	}
//....
}

META-INF/scripts/request_rate_limiter.lua

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

더 자세한 코드분석은 생략하겠다. 이렇게 Redis에서 Lua script로 작성된 로직의 결과는 RequestRateLimiterGatewayFilterFactory 에서 사용된다.

/**
 * User Request Rate Limiter filter. See https://stripe.com/blog/rate-limiters and
 */
public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
@Override
	public GatewayFilter apply(Config config) {
		KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver;
		RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter;

		return (exchange, chain) -> {
			Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);

			return resolver.resolve(exchange).flatMap(key ->
					// TODO: if key is empty? // 여기서 사용 됨 
					limiter.isAllowed(route.getId(), key).flatMap(response -> {
						// TODO: set some headers for rate, tokens left

						if (response.isAllowed()) {
							return chain.filter(exchange);
						}
						exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
						return exchange.getResponse().setComplete();
					}));
		};
	}

장점

단점

Usecase


참고