본문 바로가기
Python/FastAPI

FastAPI can not use request body in middleware / 미들웨어에서 request body 사용 불가 / FastAPI(starlette) AGSI flow

by ahsung 2022. 4. 10.

starlette는 경량 ASGI를 구현할 수 있는 웹프레임워크이며,

FastAPI는 starlette를 wraping하여 http 서비스(웹 혹은 API)를 간단하게 만들 수 있는 웹프레임워크입니다.

https://www.starlette.io/

 

Starlette

✨ The little ASGI framework that shines. ✨ Introduction Starlette is a lightweight ASGI framework/toolkit, which is ideal for building async web services in Python. It is production-ready, and gives you the following: A lightweight, low-complexity HTTP

www.starlette.io

 

 

FastAPI에서는 route에 명시한 함수(endpoint)에 도달하기전과 통과한후 사이에 로직을 추가할 수 있는 미들웨어 계층을 제공합니다.

 

미들웨어 코드 예시

from fastapi import FastAPI

app = FastAPI(
    title="Hello"
)

@app.middleware('http')
async def middleware(request: Request, call_next):

    requestEndpoint = str(request.method) + " " + str(request.url.path)
    client = str(request.client.host) + ":" + str(request.client.port)

#     미들웨어에서 body는 사용하면 안됨.
#     body = await request.body()

    print(requestEndpoint, client)
    
    response: Response = await call_next(request)

    return response

위 예시처럼 미들웨어를 통해서 call_next의 전후로 로깅등 필요한 로직을 추가할 수 있습니다.

다만, 현재 미들웨어는 http만 제공하며 미들웨어에서 body를 load하면 정상적으로 동작하지 않습니다.

(테스트 해보지 않았지만 uvicorn(웹서버)에서 TLS 처리후 Fast API app에 http 프로토콜이 넘어오지 않을까 추측)

call_next로 들어간 이후에 내부에서 body를 사용할때 무한 wait이 걸리거나 에러가 발생 하게 됩니다.

 

 

Request 클래스 코드 분석

# starlette.requests 파일 

class Request(HTTPConnection):
    def __init__(
        self, scope: Scope, receive: Receive = empty_receive, send: Send = empty_send
    ):
        super().__init__(scope)
        assert scope["type"] == "http"
        self._receive = receive
        self._send = send
        self._stream_consumed = False
        self._is_disconnected = False

    @property
    def method(self) -> str:
        return self.scope["method"]

    @property
    def receive(self) -> Receive:
        return self._receive

    async def stream(self) -> typing.AsyncGenerator[bytes, None]:
        if hasattr(self, "_body"):
            yield self._body
            yield b""
            return
            
        if self._stream_consumed:
            raise RuntimeError("Stream consumed")

        self._stream_consumed = True
        while True:
            message = await self._receive()
            if message["type"] == "http.request":
                body = message.get("body", b"")
                if body:
                    yield body
                if not message.get("more_body", False):
                    break
            elif message["type"] == "http.disconnect":
                self._is_disconnected = True
                raise ClientDisconnect()
        yield b""

    async def body(self) -> bytes:
        if not hasattr(self, "_body"):
            chunks = []
            async for chunk in self.stream():
                chunks.append(chunk)
            self._body = b"".join(chunks)

        return self._body

FastAPI(내부적으로 starlette)의 body를 load하는 동작

  1. 45줄의 body() 함수는 stream(22줄 함수) -> _receive(33줄 호출)을 통해 request body 값 획득
  2. _body 멤버 변수에 저장, _stream_consumed = True
  3. 한번 소비된 request body는 _body에서 꺼내서 재사용

미들웨어 계층에서 request.body() 사용시 변수 값을 확인해보면(로깅 코드를 삽입 or 디버깅 브레이크 포인트)

  • 미들웨어 계층에서는 request.body()를 호출후 _body에 저장된 상태 ok
  • call_next 내부에 동작되는 함수들에서는 request._body와 _stream_consumed이 존재하지 않음
    • call_next, APIroute 클래스 계층, route(endpoint)함수에서 확인
  • _receive()함수에서 무한 wait을 발생

 

 

 

receive 함수의 동작

 # uvicorn.protocals.http.h11_impl.py
 
 async def receive(self):
        if self.waiting_for_100_continue and not self.transport.is_closing():
            event = h11.InformationalResponse(
                status_code=100, headers=[], reason="Continue"
            )
            output = self.conn.send(event)
            self.transport.write(output)
            self.waiting_for_100_continue = False

        if not self.disconnected and not self.response_complete:
            self.flow.resume_reading()
            await self.message_event.wait()
            self.message_event.clear()

        if self.disconnected or self.response_complete:
            message = {"type": "http.disconnect"}
        else:
            message = {
                "type": "http.request",
                "body": self.body,
                "more_body": self.more_body,
            }
            self.body = b""

        return message

uvicorn 웹서버의 receive 함수를 살펴보면 14줄에서 asyncio.Event() 객체인 message_event.wait() 로직은 message_event.set()이 되지 않으면 계속 block되는 함수입니다.

 

uvicorn은 client에서 더 이상 받을 data가 없는 경우, message_event.set()을 사용 할 수 없는 분기문으로 빠지고 이미 들어온 Task는 callback(response 후처리)을 통해서 비동기적으로 마무리 됩니다.

 

즉 이미 커스텀 미들웨어에서 receive를 호출후 다음 ASGIapp 계층에서 receive를 다시 호출하게되면 message_event.set()이 더 이상 발생하지 않기 때문에 message_event.wait()에서 block되어 버립니다.

그래서 FastAPI(Starlette)의 Request 객체는 한번 body를 얻고나면 receive를 재호출하지 않고 저장한 _body 멤버를 재사용하게 설계하였습니다.

 

 

그렇다면 도대체 request.body()를 호출후에 어째서 저장한 _body를 재사용하지 않고 receive 함수를 재호출하는 걸까요?

 

 

ASGI 데이터 흐름

ASGIapp(ASGI 인터페이스를 따르는 객체)는 기본적으로 scope, receive, send 파라미터를 갖고 있습니다.

Fast API (Starlette)는 최초의 ASGIapp 객체이며 self.app = ASGIapp(app=~~) 방식으로 하위에 ASGIapp 객체를 갖고 

계층(layer) 구조를 갖습니다.

 

scope : 전반적인 메타데이터를 가지고 있는 dict 구조체입니다. (header, url path, protocal type, asgi version, type ...etc...)

receive : 웹서버쪽에서 데이터(request body)를 받는 함수 # type에 따라 다를 수 있음

send : 웹서버로 데이터(response)를 보내는 함수 # type에 따라 다를 수 있음

 

예시 

uvicorn.run(app=FastAPI) -> FastAPI(scope, receive, send) 
-> middleware(scope, receive, send) -> call_next(request)
-> APIRouter(scope, receive, send) -> APIRoute(scope, receive, send) 
-> route_handler(request) -> endpoint(**kwargs)

 

 

 

call_next 함수 코드 분석

    # starlette.middleware.base
    # 28줄부터
    
    async def call_next(self, request: Request) -> Response:
        loop = asyncio.get_event_loop()
        queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

        scope = request.scope
        receive = request.receive
        send = queue.put
        
        async def coro() -> None:
            try:
                await self.app(scope, receive, send)
            finally:
                await queue.put(None)

        task = loop.create_task(coro())
        message = await queue.get()
        if message is None:
            task.result()
            raise RuntimeError("No response returned.")

        assert message["type"] == "http.response.start"

        async def body_stream() -> typing.AsyncGenerator[bytes, None]:
            while True:
                message = await queue.get()
                if message is None:
                    break
                assert message["type"] == "http.response.body"
                yield message.get("body", b"")
            task.result()

        response = StreamingResponse(
            status_code=message["status"], content=body_stream()
        )
        response.raw_headers = message["headers"]

        return response
  1. 10줄 -> send를 queue.put 함수로 변경
  2. 12줄 ->  다음 계층의 app(보통 fastapi.routing.APIRouter)을 실행하는 코루틴 정의
  3. 18줄 -> 이벤트 루프에 넣고 실행, 다음 계층의 app은 send가 웹서버에 보내는 것이 아닌 queue에 쌓음
  4. 26줄 -> queue에 쌓인 send(message)값을 iterable하게 반환하는 generator 함수 정의
  5. 36줄 -> response 객체 생성, return response

 

 

FastAPI의 ASGIapp 계층의 데이터 흐름을 보면 뜬금 없이 ASGIapp들 사이에 call_next는 ASGI 인터페이스를 따르지 않고

Starlette 자체적으로 사용하는 Request 객체를 사용하고 있습니다.

 

물론 Request도 내부적으로 scope, receive, send를 갖고 있으나 문제는 ASGI 표준에서 scope는 body값을 갖지 않습니다.

즉 계층이 넘어가면서 body를 넘겨주지 못하며 call_next에서 사용한 request와 router_handler 이후에 사용하는 request가 별개의 객체라는 것을 알 수 있습니다.

-> 별개의 객체라면 _body 멤버 변수에 저장한 request body값을 넘겨주지 못했다는 것

-> scope값을 가지고 새로운 request 객체를 생성했으므로, 실제로 uvicorn의 receive를 통해 이미 stream body를 소모했지만

-> 새로운 request 객체는 그 사실을 알 수 없고 다시 receive를 재호출하여 block되어 버린 버그 상태입니다.

 

해결 방법으로는 user_middleware(커스텀 미들웨어) 레이어 이후로는 ASGIapp 인터페이스를 사용하지말고 request로 파라미터를 통일하여 넘기는 방식으로 수정이 필요할 듯 보이는데, 포스팅 기준일자로 최신 FastAPI까지 딱히 적용되진 않은 것 같습니다.

 

중간에 Body값을 확인하거나 펼쳐야되는 이슈가 있다면 미들웨어를 사용하지 않고

route_handler 계층 수준에서 커스텀하게 APIroute 파생 클래스를 생성하여 APIrouter에 끼워서 사용하는 방법이 있습니다.

class CustomAPIRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            try:
                requestBody = await request.body()

                uvicornAccessLogger.info("BODY: {}".format(requestBody))

            except JSONDecodeError as e:
                uvicornAccessLogger.info("[JSONdecode ERROR] BODY:\n{}".format(e.doc))

            except Exception as e:
                uvicornAccessLogger.exception("ERROR: " + str(e))


            response: Response = await route_handler(request)

            return response

        return custom_route_handler
        

router = APIRouter(route_class=CustomAPIRoute)

 

 

 

 

 

 

 

 

  

 

 

아래 uvicorn with Fast API의 ASGIapp 계층 분석은 다시 정리해서 포스팅 예정

FAST API app flow


uvicorn .run (app)

-> Fast API(Starlette).__call__(scope, receive, send)
-> middleware_stack(scope, receive, send)
-> app = Middleware(app=app) for app in middleware_list   # 최초 app은 APIRouter(Router)
-> app.__call__(scope, receive, send) # 미들웨어 하나씩 실행
	-> user_middleware(BaseHTTPMiddleware)가 있는 경우, 
	-> user_middleware.__call__(scope, receive, send)
	-> response = user_middleware(self).call_next(request)
	-> user_middleware의 call_next()는 send = queue.get을 통해 처리, 여기 미들웨어에서도 app을 계속 호출

-> 마지막은 미들웨어 스택의 app = APIRouter.__call__(scope, receive, send)
-> APIroute(route).handle(scope, receive, send)    for route in APIRouter(self).routes if route.match(path)

# 이건 Starlette.Route.handle
-> route(self).handle(scope, receive, send)-> route(self).app(scope, receive, send) # route(self).app = request_response(self.endpoint)

# Fast API의 APIRoute.handle
->  handle(scope, receive, send)-> self.app = request_response(self.get_route_handler())
get_route_handler에서 endpoint의 파라미터를 분석하여  app(request) 형태로 wrapping 함
즉 Fast API endpoint 함수의 파라미터들을 request하나만으로 call할수 있는 형태로 변경 Starlette와 호환되도록


-> request_response(func)
-> response = func(request)     # Fast API의 func = get_route_handler()
-> await response(scope, receive, send)

-> response.__call__(scope, receive, send)
-> 최종 웹서버로 보내는 send 실행 
	## user_middleware 사용한 경우
	-> user_middleware.call_next(request)
	-> user_middleware는 하위의 app(APIRoute)의 send는 Queue에 넣고
	-> queue.get으로 얻은 body(content)를 generator Iter로 response 생성
        -> return response = StreamingResponse(…, content=body_stream()) # call_next return!!
	-> await response(scope,receive, send)
        -> Send! (상위 미들웨어에서 물려받은 send)

 

 

 

 

 

 

 

댓글