Async/Await Guide
Tenxyte Core services provide first-class support for asynchronous operations through async/await patterns. This guide explains how to use the async methods effectively in both FastAPI and Django applications.
Table of Contents
- Overview
- Async Methods Reference
- CacheService
- JWTService
- TOTPService
- MagicLinkService
- SessionService
- TaskService
- When to Use Async vs Sync
- Framework-Specific Patterns
- FastAPI (Native Async)
- Django (Async-Compatible)
- Performance Considerations
- Common Pitfalls
Overview
All Tenxyte Core services provide both synchronous and asynchronous versions of their methods:
# Synchronous (blocks until completion)
token = jwt_service.generate_access_token(user_id="123")
# Asynchronous (yields control to event loop)
token = await jwt_service.generate_access_token_async(user_id="123")
Key benefits of using async: - Non-blocking I/O operations - Better concurrency handling - Improved throughput for I/O-bound operations - Native integration with FastAPI and modern async frameworks
Async Methods Reference
CacheService
All cache operations have async variants that delegate to the underlying cache backend asynchronously:
from tenxyte.core.cache_service import InMemoryCacheService
cache = InMemoryCacheService()
# Basic operations
await cache.set_async("key", "value", timeout=3600)
value = await cache.get_async("key")
exists = await cache.exists_async("key")
await cache.delete_async("key")
# Counter operations
count = await cache.increment_async("counter", delta=1)
# Rate limiting (returns: allowed, remaining, reset_time)
allowed, remaining, reset = await cache.check_rate_limit_async(
key="rate:user:123",
max_requests=100,
window_seconds=60
)
# Token blacklisting (JWT revocation)
await cache.add_to_blacklist_async(jti="token-id", expires_in=3600)
is_blacklisted = await cache.is_blacklisted_async("token-id")
JWTService
Token operations support async for non-blocking validation and blacklisting:
from tenxyte.core.jwt_service import JWTService
jwt_svc = JWTService(settings=settings, blacklist_service=cache)
# Generate tokens (sync only - cryptographic operations)
access_token, jti, exp = jwt_svc.generate_access_token(user_id="123")
refresh_token = jwt_svc.generate_refresh_token(user_id="123")
# Decode and validate (async)
decoded = await jwt_svc.decode_token_async(token)
if decoded and decoded.is_valid:
print(f"User: {decoded.user_id}")
# Blacklisting (async I/O)
await jwt_svc.blacklist_token_async(token, user_id="123")
await jwt_svc.blacklist_token_by_jti_async(jti="token-id", expires_at=datetime)
# Refresh tokens (async)
new_tokens = await jwt_svc.refresh_tokens_async(refresh_token)
if new_tokens:
print(f"New access token: {new_tokens.access_token}")
# User-level revocation (async)
await jwt_svc.revoke_all_user_tokens_async(user_id="123")
TOTPService
Two-factor authentication with async storage adapters:
from tenxyte.core.totp_service import TOTPService
totp_svc = TOTPService(settings=settings)
# Setup 2FA (async with storage)
result = await totp_svc.setup_2fa_async(
user_id="123",
email="user@example.com",
storage=async_storage_adapter
)
# Returns: secret, qr_code (base64), backup_codes
# Confirm setup with initial code
ok, error = await totp_svc.confirm_2fa_setup_async(
user_id="123",
code="123456",
storage=async_storage_adapter
)
# Verify during login
ok, error = await totp_svc.verify_2fa_async(
user_id="123",
code="123456",
storage=async_storage_adapter
)
# Returns (True, "") for valid code
# Returns (True, "") if 2FA not enabled (passthrough)
# Returns (False, "error message") for invalid
# Disable 2FA
ok, error = await totp_svc.disable_2fa_async(
user_id="123",
code="123456",
storage=async_storage_adapter
)
# Regenerate backup codes
ok, new_codes, error = await totp_svc.regenerate_backup_codes_async(
user_id="123",
code="123456", # Current TOTP or backup code
storage=async_storage_adapter
)
# Verify standalone code (for inline checks)
is_valid = await totp_svc.verify_code_async(
secret=decrypted_secret,
code="123456",
user_id="123" # Optional: for replay protection
)
MagicLinkService
Passwordless authentication with async operations:
from tenxyte.core.magic_link_service import MagicLinkService
magic_svc = MagicLinkService(
settings=settings,
email_service=email_service,
repo=async_repo,
user_lookup=async_user_lookup
)
# Request magic link (sends email asynchronously)
success, error = await magic_svc.request_magic_link_async(
email="user@example.com",
application_id="app-123",
ip_address="1.2.3.4",
user_agent="Mozilla/5.0..."
)
# Verify token from link
result = await magic_svc.verify_magic_link_async(
token="token-from-url",
application_id="app-123",
ip_address="1.2.3.4",
require_same_device=True # Optional: IP matching
)
# Returns MagicLinkResult with:
# - success: bool
# - user_id: str (if success)
# - error: str (if failed)
SessionService
Session management with async cache and repository operations:
from tenxyte.core.session_service import SessionService
session_svc = SessionService(
settings=settings,
cache_service=async_cache,
session_repository=async_repo
)
# Create session
session_data = await session_svc.create_session_async(
user=user,
device_id="device-123",
ip_address="1.2.3.4",
user_agent="Mozilla/5.0...",
application_id="app-123"
)
# Returns dict with session_id, device_fingerprint, etc.
# Validate session
session = await session_svc.validate_session_async(session_id)
if session:
print(f"Valid session for user: {session['user_id']}")
# Revoke single session
await session_svc.revoke_session_async(session_id)
# Revoke all user sessions
revoked_count = await session_svc.revoke_all_sessions_async(
user_id="123",
except_session_id="keep-this-one" # Optional
)
TaskService
Background job execution with async support:
from tenxyte.adapters.fastapi.task_service import AsyncIOTaskService
task_svc = AsyncIOTaskService()
# Enqueue sync function (runs in thread pool)
task_id = await task_svc.enqueue_async(send_email, user_id, message)
# Enqueue async function (runs as asyncio task)
task_id = await task_svc.enqueue_async(async_webhook_call, payload)
# Base implementation falls back to to_thread for sync adapters
# Native async adapters (AsyncIOTaskService) handle both optimally
When to Use Async vs Sync
Use Async When:
| Scenario | Reason |
|---|---|
| FastAPI endpoints | Native async framework; blocking sync calls hurt performance |
| I/O operations (cache, DB, email) | Don't block the event loop waiting for network/disk |
| Concurrent token validation | Process multiple requests simultaneously |
| Rate limiting checks | Quick async cache lookups |
| Token blacklisting | Non-blocking write to cache/DB |
Use Sync When:
| Scenario | Reason |
|---|---|
| Cryptographic operations | jwt.encode() is CPU-bound, not I/O; async adds overhead |
| TOTP code generation | pyotp is synchronous and fast; no I/O |
| Password hashing | bcrypt is CPU-bound; use sync |
| Single-threaded scripts | Simpler, no event loop needed |
| Django views (non-async) | Traditional Django is sync-first |
Example: Choosing the Right Method
from tenxyte.core.jwt_service import JWTService
jwt_svc = JWTService(settings=settings)
# FastAPI endpoint - use async
def create_access_token_async(user_id: str) -> str:
# generate_access_token is sync (crypto), but that's OK
# It's fast and CPU-bound
token, _, _ = jwt_svc.generate_access_token(user_id)
return token
# FastAPI endpoint - validate with async
def validate_token_async(token: str) -> Optional[DecodedToken]:
# decode_token_async allows other requests to proceed
# while we check the blacklist (I/O operation)
return await jwt_svc.decode_token_async(token)
# Background task - send revocation notice
def notify_logout_async(user_id: str, token: str):
# Blacklist check and add are I/O
await jwt_svc.blacklist_token_async(token, user_id)
Framework-Specific Patterns
FastAPI (Native Async)
FastAPI is designed for async. Use async def for all endpoints and prefer async Tenxyte methods:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer
app = FastAPI()
security = HTTPBearer()
# Initialize services
jwt_svc = JWTService(settings=settings, blacklist_service=cache)
@app.post("/auth/login")
async def login(credentials: LoginCredentials):
# Sync is fine for crypto operations
if not verify_password(credentials.password, user.password_hash):
raise HTTPException(401, "Invalid credentials")
access_token, _, _ = jwt_svc.generate_access_token(str(user.id))
return {"access": access_token}
@app.get("/protected")
async def protected_route(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
# Use async validation to not block other requests
decoded = await jwt_svc.decode_token_async(token)
if not decoded or not decoded.is_valid:
raise HTTPException(401, "Invalid token")
if decoded.is_blacklisted:
raise HTTPException(401, "Token revoked")
return {"user_id": decoded.user_id}
@app.post("/auth/logout")
async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
# Async blacklisting
await jwt_svc.blacklist_token_async(token)
return {"status": "logged_out"}
Django (Async-Compatible)
Django 4.2+ supports async views. Use async def with Tenxyte's async methods:
# views.py
from django.http import JsonResponse
from django.views import View
from asgiref.sync import sync_to_async
from tenxyte.core.jwt_service import JWTService
jwt_svc = JWTService(settings=settings)
# Async class-based view
class AsyncProtectedView(View):
async def get(self, request):
auth_header = request.headers.get("Authorization", "")
token = auth_header.replace("Bearer ", "")
# Async token validation
decoded = await jwt_svc.decode_token_async(token)
if not decoded or not decoded.is_valid:
return JsonResponse({"error": "Unauthorized"}, status=401)
# For ORM operations, wrap sync calls
user = await sync_to_async(User.objects.get)(id=decoded.user_id)
return JsonResponse({"user": user.email})
# Async function-based view
async def async_logout(request):
token = extract_token_from_request(request)
# Async blacklisting
await jwt_svc.blacklist_token_async(token)
return JsonResponse({"status": "logged_out"})
Note: Django's ORM is still primarily synchronous. Use sync_to_async wrapper:
from asgiref.sync import sync_to_async
# Wrap ORM calls
user = await sync_to_async(User.objects.get)(id=user_id)
sessions = await sync_to_async(list)(UserSession.objects.filter(user_id=user_id))
Performance Considerations
Thread Pool vs Native Async
| Adapter | enqueue() |
enqueue_async() |
Best For |
|---|---|---|---|
AsyncIOTaskService |
run_in_executor (thread pool) |
create_task for coroutines |
FastAPI, pure async |
CeleryTaskService |
Celery worker | to_thread fallback |
Distributed, heavy tasks |
RQTaskService |
RQ worker | to_thread fallback |
Redis-based queue |
SyncThreadTaskService |
Thread |
to_thread fallback |
Dev, no dependencies |
Async Method Implementation Details
# CacheService async methods:
# - InMemoryCacheService: Uses to_thread (locks are sync)
# - RedisCacheService: Uses redis-py async client (truly async I/O)
# JWTService async methods:
# - decode_token_async: Async blacklist check (I/O)
# - blacklist_token_async: Async cache write (I/O)
# - refresh_tokens_async: Async DB/cache operations
# All other crypto operations remain sync (CPU-bound)
Benchmark: Sync vs Async Blacklist Check
import asyncio
import time
# Simulating 100 concurrent token validations
async def benchmark():
tokens = [generate_test_token() for _ in range(100)]
# Sync version (blocking)
start = time.time()
for token in tokens:
jwt_svc.decode_token(token) # Blocking I/O
sync_time = time.time() - start
# Async version (concurrent)
start = time.time()
await asyncio.gather(*[
jwt_svc.decode_token_async(token) for token in tokens
])
async_time = time.time() - start
print(f"Sync: {sync_time:.2f}s, Async: {async_time:.2f}s")
# Typical result: Async 5-10x faster for I/O-bound operations
asyncio.run(benchmark())
Common Pitfalls
1. Forgetting await
# WRONG: Returns a coroutine object, not the result
decoded = jwt_svc.decode_token_async(token) # Missing await!
if decoded.is_valid: # AttributeError: 'coroutine' object has no attribute 'is_valid'
...
# CORRECT:
decoded = await jwt_svc.decode_token_async(token)
2. Calling Async in Sync Context Without Event Loop
# WRONG: Can't await outside async function
def sync_function():
result = await cache.get_async("key") # SyntaxError
# CORRECT: Use asyncio.run() or async def
async def async_function():
result = await cache.get_async("key")
# Or for quick scripts:
import asyncio
result = asyncio.run(cache.get_async("key"))
3. Blocking the Event Loop in Async Context
# WRONG: Blocks entire event loop
async def bad_endpoint():
time.sleep(5) # Blocks ALL concurrent requests!
return {"done": True}
# CORRECT: Use async sleep or run in executor
async def good_endpoint():
await asyncio.sleep(5) # Yields to other requests
# OR for sync I/O:
await asyncio.to_thread(blocking_io_function)
return {"done": True}
4. Using Sync Methods in FastAPI Without Care
# WRONG in FastAPI: Blocks the event loop
@app.get("/slow")
def slow_endpoint(): # Notice: def, not async def
time.sleep(10) # Blocks all other requests!
return {"done": True}
# CORRECT:
@app.get("/slow")
async def fast_endpoint():
await asyncio.sleep(10) # Other requests proceed
return {"done": True}
5. Mixing Async and Django ORM Incorrectly
# WRONG: Direct ORM call in async view
async def bad_view(request):
user = User.objects.get(id=1) # Synchronous I/O in async context!
# CORRECT: Wrap ORM with sync_to_async
from asgiref.sync import sync_to_async
async def good_view(request):
user = await sync_to_async(User.objects.get)(id=1)
# OR for queries returning multiple objects:
users = await sync_to_async(list)(User.objects.all())
Quick Reference: Async Method Cheat Sheet
# CacheService
await cache.get_async(key)
await cache.set_async(key, value, timeout)
await cache.delete_async(key)
await cache.exists_async(key)
await cache.increment_async(key, delta)
await cache.expire_async(key, timeout)
await cache.ttl_async(key)
await cache.add_to_blacklist_async(jti, expires_in)
await cache.is_blacklisted_async(jti)
await cache.remove_from_blacklist_async(jti)
await cache.check_rate_limit_async(key, max_requests, window_seconds)
await cache.reset_rate_limit_async(key)
# JWTService
decoded = await jwt_svc.decode_token_async(token)
await jwt_svc.blacklist_token_async(token, user_id)
await jwt_svc.blacklist_token_by_jti_async(jti, expires_at, user_id)
tokens = await jwt_svc.refresh_tokens_async(refresh_token)
await jwt_svc.revoke_all_user_tokens_async(user_id)
# TOTPService
setup = await totp_svc.setup_2fa_async(user_id, email, storage)
ok, err = await totp_svc.confirm_2fa_setup_async(user_id, code, storage)
ok, err = await totp_svc.verify_2fa_async(user_id, code, storage)
ok, err = await totp_svc.disable_2fa_async(user_id, code, storage)
ok, codes, err = await totp_svc.regenerate_backup_codes_async(user_id, code, storage)
is_valid = await totp_svc.verify_code_async(secret, code, user_id)
# MagicLinkService
ok, err = await magic_svc.request_magic_link_async(email, ...)
result = await magic_svc.verify_magic_link_async(token, ...)
# SessionService
session = await session_svc.create_session_async(user, ...)
session = await session_svc.validate_session_async(session_id)
await session_svc.revoke_session_async(session_id)
count = await session_svc.revoke_all_sessions_async(user_id)
# TaskService
task_id = await task_svc.enqueue_async(func, *args, **kwargs)
Next Steps
- Task Service — Background job processing
- FastAPI Quickstart — Complete FastAPI setup
- Architecture — Understanding Ports and Adapters