Files
CosScene/server/app/main.py
T
shiran f1ce992d69 feat(server): 添加默认管理员自动创建功能
- 在 .env.example 中添加默认管理员相关配置项
- 在 docker-compose.yml 中添加默认管理员环境变量映射
- 在 server/app/core/config.py 中定义默认管理员配置
- 创建 server/app/db/bootstrap.py 文件实现默认管理员创建逻辑
- 在 server/app/main.py 的生命周期中集成默认管理员确保功能
- 更新 README.md 文档说明新的管理员配置方式

新配置项包括:DEFAULT_ADMIN_ENABLED、DEFAULT_ADMIN_PHONE、
DEFAULT_ADMIN_EMAIL、DEFAULT_ADMIN_PASSWORD、DEFAULT_ADMIN_NICKNAME
和 DEFAULT_ADMIN_SYNC_PASSWORD。
2026-05-09 19:00:02 +08:00

197 lines
6.3 KiB
Python

import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
import redis.asyncio as aioredis
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from geoalchemy2.functions import ST_X, ST_Y
from sqlalchemy import select
from app.api.v1.router import v1_router
from app.core.config import settings
from app.core.deps import get_current_active_user, get_db
from app.core.storage import init_storage
from app.db.bootstrap import ensure_default_admin
from app.db.migrations import run_startup_migrations
from app.models.spot import Spot
from app.models.tag import Tag
from app.models.user import User
from app.schemas.admin import AdminSpotDetailItem
from sqlalchemy.ext.asyncio import AsyncSession
if settings.SENTRY_DSN:
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=settings.SENTRY_DSN,
integrations=[FastApiIntegration(), SqlalchemyIntegration()],
traces_sample_rate=0.2,
send_default_pii=False,
)
from app.core.logging_config import setup_logging
setup_logging(
json_format=settings.LOG_JSON,
level=getattr(logging, settings.LOG_LEVEL.upper(), logging.INFO),
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
await asyncio.to_thread(run_startup_migrations)
await asyncio.to_thread(ensure_default_admin)
app.state.redis = aioredis.from_url(settings.REDIS_URL, decode_responses=True)
app.state.storage = init_storage()
yield
if app.state.redis:
await app.state.redis.aclose()
app = FastAPI(title="次元取景器 API", version="0.1.0", lifespan=lifespan)
def _assert_admin_role(user: User) -> None:
if user.role not in ("admin", "moderator"):
raise HTTPException(
status_code=403,
detail="Admin permission required",
)
@app.get("/api/v1/admin/spot-tag-options")
async def admin_spot_tag_options_fallback(
keyword: str = "",
limit: int = 50,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
_assert_admin_role(current_user)
stmt = select(Tag.id, Tag.name).where(Tag.is_active.is_(True)).order_by(Tag.usage_count.desc(), Tag.id.desc()).limit(limit)
if keyword:
like = f"%{keyword.strip()}%"
stmt = (
select(Tag.id, Tag.name)
.where(Tag.is_active.is_(True), Tag.name.ilike(like))
.order_by(Tag.usage_count.desc(), Tag.id.desc())
.limit(limit)
)
rows = (await db.execute(stmt)).all()
return [{"id": int(r[0]), "title": str(r[1])} for r in rows]
@app.get("/api/v1/admin/spots/{spot_id}", response_model=AdminSpotDetailItem)
async def admin_spot_detail_fallback(
spot_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
_assert_admin_role(current_user)
result = await db.execute(
select(Spot, ST_X(Spot.location).label("lng"), ST_Y(Spot.location).label("lat"))
.where(Spot.id == spot_id)
)
row = result.one_or_none()
if not row:
return AdminSpotDetailItem(
id=spot_id,
title="",
city="",
longitude=0.0,
latitude=0.0,
description=None,
transport=None,
best_time=None,
difficulty=None,
is_free=True,
price_min=None,
price_max=None,
audit_status="deleted",
reject_reason="spot_not_found",
creator_id=current_user.id,
tag_ids=[],
image_urls=[],
images=[],
)
spot, lng, lat = row
image_urls = [img.image_url for img in sorted(spot.images, key=lambda x: x.sort_order)]
tag_ids = [tag.id for tag in spot.tags]
return AdminSpotDetailItem(
id=spot.id,
title=spot.title,
city=spot.city,
longitude=lng if lng is not None else spot.longitude,
latitude=lat if lat is not None else spot.latitude,
description=spot.description,
transport=spot.transport,
best_time=spot.best_time,
difficulty=spot.difficulty,
is_free=spot.is_free,
price_min=float(spot.price_min) if spot.price_min is not None else None,
price_max=float(spot.price_max) if spot.price_max is not None else None,
audit_status=spot.audit_status,
reject_reason=spot.reject_reason,
creator_id=spot.creator_id,
tag_ids=tag_ids,
image_urls=image_urls,
images=[
{
"id": img.id,
"spot_id": img.spot_id,
"image_url": img.image_url,
"is_cover": img.is_cover,
"sort_order": img.sort_order,
"created_at": img.created_at,
}
for img in sorted(spot.images, key=lambda x: x.sort_order)
],
)
@app.middleware("http")
async def log_5xx_response(request: Request, call_next):
response = await call_next(request)
if response.status_code >= 500:
logger.error("Server 5xx response %s %s -> %s", request.method, request.url.path, response.status_code)
return response
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.error(
"Unhandled exception on %s %s: %s",
request.method,
request.url.path,
repr(exc),
exc_info=(type(exc), exc, exc.__traceback__),
)
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"},
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if settings.STORAGE_BACKEND == "local":
uploads_dir = Path(settings.LOCAL_STORAGE_PATH)
uploads_dir.mkdir(parents=True, exist_ok=True)
app.mount("/uploads", StaticFiles(directory=str(uploads_dir)), name="uploads")
app.include_router(v1_router, prefix="/api/v1")
@app.get("/")
async def health_check():
return {"status": "ok", "name": "次元取景器 API"}