341 lines
11 KiB
Python
341 lines
11 KiB
Python
import asyncio
|
|
import base64
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from decimal import Decimal, InvalidOperation
|
|
from typing import Any, Dict, Optional
|
|
from uuid import uuid4
|
|
|
|
import httpx
|
|
from fastapi import FastAPI, Header, HTTPException, Request
|
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
|
from jinja2 import Template
|
|
|
|
app = FastAPI(title="YooKassa Mock v2")
|
|
|
|
payments_db: Dict[str, Dict[str, Any]] = {}
|
|
idempotence_db: Dict[str, Dict[str, Any]] = {}
|
|
|
|
WEBHOOK_URL = os.getenv("WEBHOOK_URL", "").strip()
|
|
MOCK_HOST = os.getenv("MOCK_HOST", "").strip()
|
|
MOCK_PORT = os.getenv("MOCK_PORT", "8081").strip()
|
|
MOCK_REQUIRE_AUTH = os.getenv("MOCK_REQUIRE_AUTH", "0").strip().lower() in {"1", "true", "yes", "on"}
|
|
MOCK_SHOP_ID = os.getenv("MOCK_SHOP_ID", "").strip()
|
|
MOCK_SECRET_KEY = os.getenv("MOCK_SECRET_KEY", "").strip()
|
|
WEBHOOK_RETRY_COUNT = max(1, int(os.getenv("WEBHOOK_RETRY_COUNT", "3")))
|
|
WEBHOOK_RETRY_DELAY_SEC = float(os.getenv("WEBHOOK_RETRY_DELAY_SEC", "1"))
|
|
WEBHOOK_DELAY_SEC = float(os.getenv("WEBHOOK_DELAY_SEC", "0"))
|
|
|
|
CHECKOUT_TEMPLATE = Template("""
|
|
<!doctype html>
|
|
<html lang="ru">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width,initial-scale=1">
|
|
<title>YooKassa Mock Checkout</title>
|
|
<style>
|
|
body {
|
|
font-family: Arial, sans-serif;
|
|
background: #f6f7fb;
|
|
margin: 0;
|
|
padding: 40px 20px;
|
|
color: #222;
|
|
}
|
|
.card {
|
|
max-width: 560px;
|
|
margin: 0 auto;
|
|
background: #fff;
|
|
border-radius: 12px;
|
|
padding: 32px;
|
|
box-shadow: 0 8px 24px rgba(0,0,0,0.08);
|
|
}
|
|
h1 { margin-top: 0; font-size: 24px; }
|
|
.amount { font-size: 28px; font-weight: 700; margin: 16px 0 24px; }
|
|
.row { margin: 8px 0; color: #555; }
|
|
.actions {
|
|
display: grid;
|
|
gap: 12px;
|
|
margin-top: 24px;
|
|
}
|
|
button {
|
|
width: 100%;
|
|
border: 0;
|
|
border-radius: 10px;
|
|
padding: 14px 18px;
|
|
font-size: 16px;
|
|
font-weight: 700;
|
|
cursor: pointer;
|
|
background: #006efc;
|
|
color: white;
|
|
}
|
|
button.secondary { background: #6b7280; }
|
|
button.danger { background: #b91c1c; }
|
|
code {
|
|
background: #f3f4f6;
|
|
padding: 2px 6px;
|
|
border-radius: 6px;
|
|
}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="card">
|
|
<h1>Тестовая оплата</h1>
|
|
|
|
<div class="row">Платеж: <code>{{ payment_id }}</code></div>
|
|
<div class="row">Статус: <code>{{ status }}</code></div>
|
|
<div class="row">Сценарий по умолчанию: <code>{{ scenario }}</code></div>
|
|
<div class="row">Описание: <code>{{ description }}</code></div>
|
|
|
|
<div>Сумма к оплате:</div>
|
|
<div class="amount">{{ amount }} {{ currency }}</div>
|
|
|
|
<div class="actions">
|
|
<form method="post" action="/process/{{ payment_id }}">
|
|
<input type="hidden" name="action" value="success">
|
|
<button type="submit">Оплатить успешно</button>
|
|
</form>
|
|
|
|
<form method="post" action="/process/{{ payment_id }}">
|
|
<input type="hidden" name="action" value="cancel">
|
|
<button class="secondary" type="submit">Отменить оплату</button>
|
|
</form>
|
|
|
|
<form method="post" action="/process/{{ payment_id }}">
|
|
<input type="hidden" name="action" value="fail">
|
|
<button class="danger" type="submit">Ошибка оплаты</button>
|
|
</form>
|
|
</div>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
""")
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
|
|
|
|
|
|
def parse_amount_value(value: Any) -> str:
|
|
try:
|
|
normalized = Decimal(str(value)).quantize(Decimal("0.01"))
|
|
except (InvalidOperation, ValueError):
|
|
raise HTTPException(status_code=400, detail="invalid amount.value")
|
|
if normalized <= 0:
|
|
raise HTTPException(status_code=400, detail="amount.value must be greater than 0")
|
|
return f"{normalized:.2f}"
|
|
|
|
|
|
def build_base_url(request: Request) -> str:
|
|
if MOCK_HOST:
|
|
return f"http://{MOCK_HOST}:{MOCK_PORT}"
|
|
return str(request.base_url).rstrip("/")
|
|
|
|
|
|
def build_payment_response(payment: Dict[str, Any], request: Request) -> Dict[str, Any]:
|
|
base_url = build_base_url(request)
|
|
payment_id = payment["id"]
|
|
return {
|
|
"id": payment_id,
|
|
"status": payment["status"],
|
|
"paid": payment["paid"],
|
|
"amount": {
|
|
"value": payment["amount"]["value"],
|
|
"currency": payment["amount"]["currency"],
|
|
},
|
|
"confirmation": {
|
|
"type": "redirect",
|
|
"confirmation_url": f"{base_url}/checkout?payment_id={payment_id}",
|
|
},
|
|
"created_at": payment["created_at"],
|
|
"description": payment["description"],
|
|
"metadata": payment["metadata"],
|
|
"capture": payment["capture"],
|
|
"recipient": {
|
|
"account_id": payment["recipient"]["account_id"],
|
|
"gateway_id": payment["recipient"]["gateway_id"],
|
|
},
|
|
"refundable": payment["status"] == "succeeded" and bool(payment["capture"]),
|
|
"test": True,
|
|
}
|
|
|
|
|
|
def require_basic_auth(authorization: Optional[str]) -> None:
|
|
if not MOCK_REQUIRE_AUTH:
|
|
return
|
|
|
|
if not authorization or not authorization.startswith("Basic "):
|
|
raise HTTPException(status_code=401, detail="basic authorization required")
|
|
|
|
try:
|
|
encoded = authorization.split(" ", 1)[1]
|
|
decoded = base64.b64decode(encoded).decode("utf-8")
|
|
username, password = decoded.split(":", 1)
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="invalid basic authorization")
|
|
|
|
if username != MOCK_SHOP_ID or password != MOCK_SECRET_KEY:
|
|
raise HTTPException(status_code=401, detail="invalid shop credentials")
|
|
|
|
|
|
def resolve_event_name(status: str) -> str:
|
|
if status == "succeeded":
|
|
return "payment.succeeded"
|
|
return "payment.canceled"
|
|
|
|
|
|
async def send_webhook(payment: Dict[str, Any]) -> None:
|
|
if not WEBHOOK_URL:
|
|
return
|
|
|
|
if WEBHOOK_DELAY_SEC > 0:
|
|
await asyncio.sleep(WEBHOOK_DELAY_SEC)
|
|
|
|
payload = {
|
|
"event": resolve_event_name(payment["status"]),
|
|
"type": "notification",
|
|
"object": {
|
|
"id": payment["id"],
|
|
"status": payment["status"],
|
|
"paid": payment["paid"],
|
|
"amount": payment["amount"],
|
|
"description": payment["description"],
|
|
"metadata": payment["metadata"],
|
|
},
|
|
}
|
|
|
|
last_error = None
|
|
for attempt in range(1, WEBHOOK_RETRY_COUNT + 1):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(WEBHOOK_URL, json=payload)
|
|
response.raise_for_status()
|
|
print(f"Webhook delivered: payment_id={payment['id']} attempt={attempt}")
|
|
return
|
|
except Exception as exc:
|
|
last_error = exc
|
|
print(f"Webhook attempt failed: payment_id={payment['id']} attempt={attempt} error={exc}")
|
|
if attempt < WEBHOOK_RETRY_COUNT:
|
|
await asyncio.sleep(WEBHOOK_RETRY_DELAY_SEC)
|
|
|
|
print(f"Webhook delivery failed: payment_id={payment['id']} error={last_error}")
|
|
|
|
|
|
@app.post("/v3/payments")
|
|
async def create_payment(
|
|
request: Request,
|
|
authorization: Optional[str] = Header(default=None),
|
|
idempotence_key: Optional[str] = Header(default=None, alias="Idempotence-Key"),
|
|
):
|
|
require_basic_auth(authorization)
|
|
|
|
if not idempotence_key:
|
|
raise HTTPException(status_code=400, detail="Idempotence-Key header is required")
|
|
|
|
if idempotence_key in idempotence_db:
|
|
existing_payment_id = idempotence_db[idempotence_key]["payment_id"]
|
|
payment = payments_db[existing_payment_id]
|
|
return JSONResponse(content=build_payment_response(payment, request))
|
|
|
|
payload = await request.json()
|
|
|
|
amount_data = payload.get("amount") or {}
|
|
confirmation_data = payload.get("confirmation") or {}
|
|
metadata = payload.get("metadata") or {}
|
|
|
|
return_url = confirmation_data.get("return_url")
|
|
if not return_url:
|
|
raise HTTPException(status_code=400, detail="confirmation.return_url is required")
|
|
|
|
amount_value = parse_amount_value(amount_data.get("value"))
|
|
currency = amount_data.get("currency", "RUB")
|
|
capture = bool(payload.get("capture", False))
|
|
description = str(payload.get("description") or "")[:128]
|
|
|
|
scenario = str(metadata.get("mock_scenario", "success")).strip().lower()
|
|
if scenario not in {"success", "cancel", "fail"}:
|
|
raise HTTPException(status_code=400, detail="metadata.mock_scenario must be success, cancel or fail")
|
|
|
|
payment_id = str(uuid4())
|
|
payment = {
|
|
"id": payment_id,
|
|
"status": "pending",
|
|
"paid": False,
|
|
"created_at": utc_now_iso(),
|
|
"description": description,
|
|
"metadata": metadata,
|
|
"capture": capture,
|
|
"return_url": return_url,
|
|
"scenario": scenario,
|
|
"amount": {
|
|
"value": amount_value,
|
|
"currency": currency,
|
|
},
|
|
"recipient": {
|
|
"account_id": MOCK_SHOP_ID or "100500",
|
|
"gateway_id": "100700",
|
|
},
|
|
}
|
|
payments_db[payment_id] = payment
|
|
idempotence_db[idempotence_key] = {"payment_id": payment_id}
|
|
|
|
return JSONResponse(content=build_payment_response(payment, request))
|
|
|
|
|
|
@app.get("/v3/payments/{payment_id}")
|
|
async def get_payment(payment_id: str, request: Request, authorization: Optional[str] = Header(default=None)):
|
|
require_basic_auth(authorization)
|
|
|
|
payment = payments_db.get(payment_id)
|
|
if not payment:
|
|
raise HTTPException(status_code=404, detail="payment not found")
|
|
|
|
return JSONResponse(content=build_payment_response(payment, request))
|
|
|
|
|
|
@app.get("/checkout", response_class=HTMLResponse)
|
|
async def checkout(payment_id: str):
|
|
payment = payments_db.get(payment_id)
|
|
if not payment:
|
|
raise HTTPException(status_code=404, detail="payment not found")
|
|
|
|
html = CHECKOUT_TEMPLATE.render(
|
|
payment_id=payment["id"],
|
|
status=payment["status"],
|
|
scenario=payment["scenario"],
|
|
description=payment["description"] or "-",
|
|
amount=payment["amount"]["value"],
|
|
currency=payment["amount"]["currency"],
|
|
)
|
|
return HTMLResponse(content=html)
|
|
|
|
|
|
@app.post("/process/{payment_id}")
|
|
async def process_payment(payment_id: str, request: Request):
|
|
payment = payments_db.get(payment_id)
|
|
if not payment:
|
|
raise HTTPException(status_code=404, detail="payment not found")
|
|
|
|
form = await request.form()
|
|
action = str(form.get("action") or payment["scenario"]).strip().lower()
|
|
|
|
if action == "success":
|
|
payment["status"] = "succeeded"
|
|
payment["paid"] = True
|
|
elif action in {"cancel", "fail"}:
|
|
payment["status"] = "canceled"
|
|
payment["paid"] = False
|
|
else:
|
|
raise HTTPException(status_code=400, detail="unsupported action")
|
|
|
|
await send_webhook(payment)
|
|
return RedirectResponse(url=payment["return_url"], status_code=303)
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {
|
|
"status": "ok",
|
|
"payments_total": len(payments_db),
|
|
"idempotence_keys_total": len(idempotence_db),
|
|
}
|