마이크로서비스 아키텍처는 대규모 애플리케이션을 작고 독립적인 서비스들로 분리합니다. FastAPI로 서비스를 구축하고, 메시지 큐와 gRPC로 서비스 간 통신을 구현해봅니다.
Microservices architecture separates large applications into small, independent services. Build services with FastAPI and implement inter-service communication with message queues and gRPC.
마이크로서비스 프로젝트 구조 Microservices Project Structure
각 서비스는 독립적인 코드베이스, 데이터베이스, 배포 파이프라인을 가집니다. 공통 스키마는 별도 패키지로 관리합니다.
Each service has independent codebase, database, and deployment pipeline. Common schemas are managed as separate packages.
microservices/
├── user-service/
│ ├── app/
│ │ ├── main.py
│ │ ├── models.py
│ │ ├── schemas.py
│ │ └── routers/
│ ├── Dockerfile
│ └── requirements.txt
├── order-service/
│ ├── app/
│ │ ├── main.py
│ │ └── consumers.py # 메시지 큐 소비자
│ └── Dockerfile
├── shared/
│ └── schemas/ # 공통 Pydantic 스키마
└── docker-compose.yml
microservices/
├── user-service/
│ ├── app/
│ │ ├── main.py
│ │ ├── models.py
│ │ ├── schemas.py
│ │ └── routers/
│ ├── Dockerfile
│ └── requirements.txt
├── order-service/
│ ├── app/
│ │ ├── main.py
│ │ └── consumers.py # Message queue consumer
│ └── Dockerfile
├── shared/
│ └── schemas/ # Common Pydantic schemas
└── docker-compose.yml
FastAPI 마이크로서비스 설계 FastAPI Microservice Design
각 서비스는 단일 책임을 갖고, 헬스체크 엔드포인트와 의존성 주입을 활용합니다.
Each service has single responsibility, with health check endpoints and dependency injection.
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from .database import get_db
from .routers import users
app = FastAPI(
title="User Service",
version="1.0.0"
)
# 헬스체크 엔드포인트
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# 라우터 등록
app.include_router(users.router, prefix="/api/v1/users")
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from .database import get_db
from .routers import users
app = FastAPI(
title="User Service",
version="1.0.0"
)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Register router
app.include_router(users.router, prefix="/api/v1/users")
서비스 간 HTTP 통신 Inter-Service HTTP Communication
httpx의 비동기 클라이언트로 다른 서비스를 호출합니다. 재시도 로직과 타임아웃을 설정합니다.
Call other services with httpx async client. Set up retry logic and timeouts.
from tenacity import retry, stop_after_attempt, wait_exponential
USER_SERVICE_URL = "http://user-service:8000"
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
async def get_user(user_id: int):
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{USER_SERVICE_URL}/api/v1/users/{user_id}"
)
response.raise_for_status()
return response.json()
from tenacity import retry, stop_after_attempt, wait_exponential
USER_SERVICE_URL = "http://user-service:8000"
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
async def get_user(user_id: int):
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{USER_SERVICE_URL}/api/v1/users/{user_id}"
)
response.raise_for_status()
return response.json()
RabbitMQ 메시지 큐 RabbitMQ Message Queue
비동기 이벤트 처리를 위해 RabbitMQ를 사용합니다. aio-pika로 비동기 메시지 발행/소비를 구현합니다.
Use RabbitMQ for async event processing. Implement async message publish/consume with
aio-pika.
import aio_pika
import json
async def publish_event(event_type: str, data: dict):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@rabbitmq/"
)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"events", aio_pika.ExchangeType.TOPIC
)
message = aio_pika.Message(
body=json.dumps(data).encode()
)
await exchange.publish(message, routing_key=event_type)
import aio_pika
import json
async def publish_event(event_type: str, data: dict):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@rabbitmq/"
)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"events", aio_pika.ExchangeType.TOPIC
)
message = aio_pika.Message(
body=json.dumps(data).encode()
)
await exchange.publish(message, routing_key=event_type)
async def consume_events():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@rabbitmq/"
)
channel = await connection.channel()
queue = await channel.declare_queue("order_events")
async def callback(message: aio_pika.IncomingMessage):
async with message.process():
data = json.loads(message.body)
print(f"주문 이벤트: {data}")
await queue.consume(callback)
async def consume_events():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@rabbitmq/"
)
channel = await connection.channel()
queue = await channel.declare_queue("order_events")
async def callback(message: aio_pika.IncomingMessage):
async with message.process():
data = json.loads(message.body)
print(f"Order event: {data}")
await queue.consume(callback)
gRPC 서비스 구현 gRPC Service Implementation
gRPC는 Protocol Buffers 기반의 고성능 RPC 프레임워크입니다. HTTP보다 빠르고 타입 안전합니다.
gRPC is a high-performance RPC framework based on Protocol Buffers. Faster and type-safe than HTTP.
syntax = "proto3";
service UserService {
rpc GetUser (UserRequest) returns (UserResponse);
}
message UserRequest {
int32 user_id = 1;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
syntax = "proto3";
service UserService {
rpc GetUser (UserRequest) returns (UserResponse);
}
message UserRequest {
int32 user_id = 1;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
import grpc
from concurrent import futures
import user_pb2, user_pb2_grpc
class UserServicer(user_pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await db.get_user(request.user_id)
return user_pb2.UserResponse(
id=user.id,
name=user.name,
email=user.email
)
async def serve():
server = grpc.aio.server()
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
import grpc
from concurrent import futures
import user_pb2, user_pb2_grpc
class UserServicer(user_pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await db.get_user(request.user_id)
return user_pb2.UserResponse(
id=user.id,
name=user.name,
email=user.email
)
async def serve():
server = grpc.aio.server()
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
✨ 마이크로서비스 핵심 패턴 ✨ Microservices Key Patterns
- 서비스 디스커버리: Consul, etcd로 서비스 위치 관리
- Circuit Breaker: 장애 서비스 격리 및 복구
- API Gateway: 단일 진입점, 인증, 로드밸런싱
- Saga 패턴: 분산 트랜잭션 처리
- Service Discovery: Manage service locations with Consul, etcd
- Circuit Breaker: Isolate and recover from failing services
- API Gateway: Single entry point, auth, load balancing
- Saga Pattern: Distributed transaction handling
⚠️ 흔한 실수들 ⚠️ Common Mistakes
- 서비스 간 동기 호출 체인 과다
- 공유 데이터베이스 사용으로 결합도 증가
- 분산 트랜잭션 미고려
- 서비스 경계 잘못 설정 (너무 작게/크게)
- Too many synchronous call chains between services
- Increased coupling from shared databases
- Not considering distributed transactions
- Wrong service boundaries (too small/large)
