starlette는 경량 ASGI를 구현할 수 있는 웹프레임워크이며,
FastAPI는 starlette를 wraping하여 http 서비스(웹 혹은 API)를 간단하게 만들 수 있는 웹프레임워크입니다.
FastAPI에서는 route에 명시한 함수(endpoint)에 도달하기전과 통과한후 사이에 로직을 추가할 수 있는 미들웨어 계층을 제공합니다.
미들웨어 코드 예시
from fastapi import FastAPI
app = FastAPI(
title="Hello"
)
@app.middleware('http')
async def middleware(request: Request, call_next):
requestEndpoint = str(request.method) + " " + str(request.url.path)
client = str(request.client.host) + ":" + str(request.client.port)
# 미들웨어에서 body는 사용하면 안됨.
# body = await request.body()
print(requestEndpoint, client)
response: Response = await call_next(request)
return response
위 예시처럼 미들웨어를 통해서 call_next의 전후로 로깅등 필요한 로직을 추가할 수 있습니다.
다만, 현재 미들웨어는 http만 제공하며 미들웨어에서 body를 load하면 정상적으로 동작하지 않습니다.
(테스트 해보지 않았지만 uvicorn(웹서버)에서 TLS 처리후 Fast API app에 http 프로토콜이 넘어오지 않을까 추측)
call_next로 들어간 이후에 내부에서 body를 사용할때 무한 wait이 걸리거나 에러가 발생 하게 됩니다.
Request 클래스 코드 분석
# starlette.requests 파일
class Request(HTTPConnection):
def __init__(
self, scope: Scope, receive: Receive = empty_receive, send: Send = empty_send
):
super().__init__(scope)
assert scope["type"] == "http"
self._receive = receive
self._send = send
self._stream_consumed = False
self._is_disconnected = False
@property
def method(self) -> str:
return self.scope["method"]
@property
def receive(self) -> Receive:
return self._receive
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
if hasattr(self, "_body"):
yield self._body
yield b""
return
if self._stream_consumed:
raise RuntimeError("Stream consumed")
self._stream_consumed = True
while True:
message = await self._receive()
if message["type"] == "http.request":
body = message.get("body", b"")
if body:
yield body
if not message.get("more_body", False):
break
elif message["type"] == "http.disconnect":
self._is_disconnected = True
raise ClientDisconnect()
yield b""
async def body(self) -> bytes:
if not hasattr(self, "_body"):
chunks = []
async for chunk in self.stream():
chunks.append(chunk)
self._body = b"".join(chunks)
return self._body
FastAPI(내부적으로 starlette)의 body를 load하는 동작
- 45줄의 body() 함수는 stream(22줄 함수) -> _receive(33줄 호출)을 통해 request body 값 획득
- _body 멤버 변수에 저장, _stream_consumed = True
- 한번 소비된 request body는 _body에서 꺼내서 재사용
미들웨어 계층에서 request.body() 사용시 변수 값을 확인해보면(로깅 코드를 삽입 or 디버깅 브레이크 포인트)
- 미들웨어 계층에서는 request.body()를 호출후 _body에 저장된 상태 ok
- call_next 내부에 동작되는 함수들에서는 request._body와 _stream_consumed이 존재하지 않음
- call_next, APIroute 클래스 계층, route(endpoint)함수에서 확인
- _receive()함수에서 무한 wait을 발생
receive 함수의 동작
# uvicorn.protocals.http.h11_impl.py
async def receive(self):
if self.waiting_for_100_continue and not self.transport.is_closing():
event = h11.InformationalResponse(
status_code=100, headers=[], reason="Continue"
)
output = self.conn.send(event)
self.transport.write(output)
self.waiting_for_100_continue = False
if not self.disconnected and not self.response_complete:
self.flow.resume_reading()
await self.message_event.wait()
self.message_event.clear()
if self.disconnected or self.response_complete:
message = {"type": "http.disconnect"}
else:
message = {
"type": "http.request",
"body": self.body,
"more_body": self.more_body,
}
self.body = b""
return message
uvicorn 웹서버의 receive 함수를 살펴보면 14줄에서 asyncio.Event() 객체인 message_event.wait() 로직은 message_event.set()이 되지 않으면 계속 block되는 함수입니다.
uvicorn은 client에서 더 이상 받을 data가 없는 경우, message_event.set()을 사용 할 수 없는 분기문으로 빠지고 이미 들어온 Task는 callback(response 후처리)을 통해서 비동기적으로 마무리 됩니다.
즉 이미 커스텀 미들웨어에서 receive를 호출후 다음 ASGIapp 계층에서 receive를 다시 호출하게되면 message_event.set()이 더 이상 발생하지 않기 때문에 message_event.wait()에서 block되어 버립니다.
그래서 FastAPI(Starlette)의 Request 객체는 한번 body를 얻고나면 receive를 재호출하지 않고 저장한 _body 멤버를 재사용하게 설계하였습니다.
그렇다면 도대체 request.body()를 호출후에 어째서 저장한 _body를 재사용하지 않고 receive 함수를 재호출하는 걸까요?
ASGI 데이터 흐름
ASGIapp(ASGI 인터페이스를 따르는 객체)는 기본적으로 scope, receive, send 파라미터를 갖고 있습니다.
Fast API (Starlette)는 최초의 ASGIapp 객체이며 self.app = ASGIapp(app=~~) 방식으로 하위에 ASGIapp 객체를 갖고
계층(layer) 구조를 갖습니다.
scope : 전반적인 메타데이터를 가지고 있는 dict 구조체입니다. (header, url path, protocal type, asgi version, type ...etc...)
receive : 웹서버쪽에서 데이터(request body)를 받는 함수 # type에 따라 다를 수 있음
send : 웹서버로 데이터(response)를 보내는 함수 # type에 따라 다를 수 있음
예시
uvicorn.run(app=FastAPI) -> FastAPI(scope, receive, send)
-> middleware(scope, receive, send) -> call_next(request)
-> APIRouter(scope, receive, send) -> APIRoute(scope, receive, send)
-> route_handler(request) -> endpoint(**kwargs)
call_next 함수 코드 분석
# starlette.middleware.base
# 28줄부터
async def call_next(self, request: Request) -> Response:
loop = asyncio.get_event_loop()
queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()
scope = request.scope
receive = request.receive
send = queue.put
async def coro() -> None:
try:
await self.app(scope, receive, send)
finally:
await queue.put(None)
task = loop.create_task(coro())
message = await queue.get()
if message is None:
task.result()
raise RuntimeError("No response returned.")
assert message["type"] == "http.response.start"
async def body_stream() -> typing.AsyncGenerator[bytes, None]:
while True:
message = await queue.get()
if message is None:
break
assert message["type"] == "http.response.body"
yield message.get("body", b"")
task.result()
response = StreamingResponse(
status_code=message["status"], content=body_stream()
)
response.raw_headers = message["headers"]
return response
- 10줄 -> send를 queue.put 함수로 변경
- 12줄 -> 다음 계층의 app(보통 fastapi.routing.APIRouter)을 실행하는 코루틴 정의
- 18줄 -> 이벤트 루프에 넣고 실행, 다음 계층의 app은 send가 웹서버에 보내는 것이 아닌 queue에 쌓음
- 26줄 -> queue에 쌓인 send(message)값을 iterable하게 반환하는 generator 함수 정의
- 36줄 -> response 객체 생성, return response
FastAPI의 ASGIapp 계층의 데이터 흐름을 보면 뜬금 없이 ASGIapp들 사이에 call_next는 ASGI 인터페이스를 따르지 않고
Starlette 자체적으로 사용하는 Request 객체를 사용하고 있습니다.
물론 Request도 내부적으로 scope, receive, send를 갖고 있으나 문제는 ASGI 표준에서 scope는 body값을 갖지 않습니다.
즉 계층이 넘어가면서 body를 넘겨주지 못하며 call_next에서 사용한 request와 router_handler 이후에 사용하는 request가 별개의 객체라는 것을 알 수 있습니다.
-> 별개의 객체라면 _body 멤버 변수에 저장한 request body값을 넘겨주지 못했다는 것
-> scope값을 가지고 새로운 request 객체를 생성했으므로, 실제로 uvicorn의 receive를 통해 이미 stream body를 소모했지만
-> 새로운 request 객체는 그 사실을 알 수 없고 다시 receive를 재호출하여 block되어 버린 버그 상태입니다.
해결 방법으로는 user_middleware(커스텀 미들웨어) 레이어 이후로는 ASGIapp 인터페이스를 사용하지말고 request로 파라미터를 통일하여 넘기는 방식으로 수정이 필요할 듯 보이는데, 포스팅 기준일자로 최신 FastAPI까지 딱히 적용되진 않은 것 같습니다.
중간에 Body값을 확인하거나 펼쳐야되는 이슈가 있다면 미들웨어를 사용하지 않고
route_handler 계층 수준에서 커스텀하게 APIroute 파생 클래스를 생성하여 APIrouter에 끼워서 사용하는 방법이 있습니다.
class CustomAPIRoute(APIRoute):
def get_route_handler(self) -> Callable:
route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
try:
requestBody = await request.body()
uvicornAccessLogger.info("BODY: {}".format(requestBody))
except JSONDecodeError as e:
uvicornAccessLogger.info("[JSONdecode ERROR] BODY:\n{}".format(e.doc))
except Exception as e:
uvicornAccessLogger.exception("ERROR: " + str(e))
response: Response = await route_handler(request)
return response
return custom_route_handler
router = APIRouter(route_class=CustomAPIRoute)
아래 uvicorn with Fast API의 ASGIapp 계층 분석은 다시 정리해서 포스팅 예정
FAST API app flow
uvicorn .run (app)
-> Fast API(Starlette).__call__(scope, receive, send)
-> middleware_stack(scope, receive, send)
-> app = Middleware(app=app) for app in middleware_list # 최초 app은 APIRouter(Router)
-> app.__call__(scope, receive, send) # 미들웨어 하나씩 실행
-> user_middleware(BaseHTTPMiddleware)가 있는 경우,
-> user_middleware.__call__(scope, receive, send)
-> response = user_middleware(self).call_next(request)
-> user_middleware의 call_next()는 send = queue.get을 통해 처리, 여기 미들웨어에서도 app을 계속 호출
-> 마지막은 미들웨어 스택의 app = APIRouter.__call__(scope, receive, send)
-> APIroute(route).handle(scope, receive, send) for route in APIRouter(self).routes if route.match(path)
# 이건 Starlette.Route.handle
-> route(self).handle(scope, receive, send)-> route(self).app(scope, receive, send) # route(self).app = request_response(self.endpoint)
# Fast API의 APIRoute.handle
-> handle(scope, receive, send)-> self.app = request_response(self.get_route_handler())
get_route_handler에서 endpoint의 파라미터를 분석하여 app(request) 형태로 wrapping 함
즉 Fast API endpoint 함수의 파라미터들을 request하나만으로 call할수 있는 형태로 변경 Starlette와 호환되도록
-> request_response(func)
-> response = func(request) # Fast API의 func = get_route_handler()
-> await response(scope, receive, send)
-> response.__call__(scope, receive, send)
-> 최종 웹서버로 보내는 send 실행
## user_middleware 사용한 경우
-> user_middleware.call_next(request)
-> user_middleware는 하위의 app(APIRoute)의 send는 Queue에 넣고
-> queue.get으로 얻은 body(content)를 generator Iter로 response 생성
-> return response = StreamingResponse(…, content=body_stream()) # call_next return!!
-> await response(scope,receive, send)
-> Send! (상위 미들웨어에서 물려받은 send)
'Python > FastAPI' 카테고리의 다른 글
[Fast API][Starlette] keepalive request(요청) 지연 현상 / background tasks / slow request (0) | 2023.07.17 |
---|
댓글