Real-World Example — TaskFlow API
A complete project management API built with Lcore, demonstrating every framework feature with 30/30 passing tests.
Overview
TaskFlow is a full-featured project management API (similar to Trello or Jira) that serves as a comprehensive test of every Lcore framework feature. It includes a real SQLite database, multi-source configuration, authentication with signed tokens and cookies, endpoint-level RBAC (admin-only guards on write operations), 10 middleware, custom plugins, four mounted sub-applications, file uploads, SMTP email, async routes, templates, lifecycle hooks, custom error handlers, and a complete single-page frontend served via SimpleTemplate.
Project Structure
backend/
app.py # Main entry point — wires everything together
config.py # Config loading + dataclass validation
models.py # SQLite database layer (schema + seed data)
.env # Environment variables
modules/
__init__.py
auth.py # Token auth, Basic Auth, signed cookies, RBAC
users.py # Users CRUD (all HTTP methods, admin-only guards)
products.py # Projects + Tasks + Comments + File uploads (admin-only guards)
notifications.py # SMTP email sending + async batch
plugins.py # Custom plugins + custom middleware
templates/
welcome.tpl # API overview + interactive playground
frontend.tpl # Full project management frontend (SPA)
error.tpl # Error page template
static/ # Static file directory
uploads/ # File upload directory
Quick Start
cd backend
python app.py
The server starts at http://localhost:8080 with a seeded SQLite database containing 3 users, 2 projects, and 3 tasks.
| Username | Password | Role |
|---|---|---|
admin | admin123 | admin |
alice | alice123 | member |
bob | bob123 | member |
# Login and get a token
curl -X POST http://localhost:8080/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin123"}'
# Use the returned token for authenticated requests
curl http://localhost:8080/api/users/ \
-H "Authorization: Bearer <token>"
Configuration
TaskFlow demonstrates Lcore's 3-source configuration loading with dataclass validation. Configuration is loaded in priority order: defaults from a dict, overrides from a .env file, and final overrides from real environment variables.
Config Loading (config.py)
from dataclasses import dataclass
from lcore import Lcore
@dataclass
class AppConfigSchema:
"""Dataclass schema for config validation."""
debug: bool = False
secret_key: str = 'change-me'
host: str = '0.0.0.0'
port: int = 8080
db_path: str = './taskflow.db'
cors_origins: str = '*'
smtp_host: str = 'smtp.gmail.com'
smtp_port: int = 587
smtp_user: str = ''
smtp_password: str = ''
smtp_from: str = '[email protected]'
smtp_use_tls: bool = True
rate_limit_default: int = 100
rate_limit_window: int = 60
max_upload_size: int = 10_485_760
upload_dir: str = './uploads'
def configure_app(app: Lcore) -> None:
"""Load configuration from multiple sources and validate."""
# 1. Load defaults via dict
app.config.load_dict({
'debug': False,
'secret_key': 'change-me-in-production',
'host': '0.0.0.0',
'port': 8080,
'db_path': './taskflow.db',
'cors_origins': '*',
'smtp_host': 'smtp.gmail.com',
'smtp_port': 587,
'smtp_user': '',
'smtp_password': '',
'smtp_from': '[email protected]',
'smtp_use_tls': True,
'rate_limit_default': 100,
'rate_limit_window': 60,
'max_upload_size': 10_485_760,
'upload_dir': './uploads',
})
# 2. Load from .env file (overrides defaults)
env_path = os.path.join(os.path.dirname(__file__), '.env')
if os.path.exists(env_path):
app.config.load_dotenv(env_path)
# 3. Load from environment variables with APP_ prefix (overrides .env)
app.config.load_env('APP_', strip_prefix=True)
# 4. Validate config against schema
try:
app.config.validate_config(AppConfigSchema)
print("[CONFIG] Configuration validated successfully")
except ValueError as e:
print(f"[CONFIG WARNING] {e}")
Each source overrides the previous: load_dict() sets defaults, load_dotenv() overrides from .env, and load_env('APP_') overrides from real environment variables. The final validate_config() ensures all values match the dataclass types.
.env File Format
# TaskFlow API Configuration
APP_NAME=TaskFlow API
APP_DEBUG=true
APP_SECRET_KEY=super-secret-key-change-in-production
APP_HOST=0.0.0.0
APP_PORT=8080
# Database
APP_DB_PATH=./taskflow.db
# CORS
APP_CORS_ORIGINS=http://localhost:3000,http://localhost:5173
# SMTP / Email
APP_SMTP_HOST=smtp.gmail.com
APP_SMTP_PORT=587
[email protected]
APP_SMTP_PASSWORD=your-app-password
[email protected]
APP_SMTP_USE_TLS=true
# Rate Limiting
APP_RATE_LIMIT_DEFAULT=100
APP_RATE_LIMIT_WINDOW=60
# Upload
APP_MAX_UPLOAD_SIZE=10485760
APP_UPLOAD_DIR=./uploads
Database & Models
TaskFlow uses SQLite with a real relational schema. The Database class wraps sqlite3 and is used as a scoped dependency (one connection per request, auto-closed).
Schema (models.py)
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'member',
avatar TEXT,
is_active INTEGER NOT NULL DEFAULT 1,
created_at REAL NOT NULL,
updated_at REAL
);
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT DEFAULT '',
owner_id INTEGER NOT NULL REFERENCES users(id),
status TEXT NOT NULL DEFAULT 'active',
created_at REAL NOT NULL,
updated_at REAL
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT DEFAULT '',
project_id INTEGER NOT NULL REFERENCES projects(id),
assignee_id INTEGER REFERENCES users(id),
creator_id INTEGER NOT NULL REFERENCES users(id),
status TEXT NOT NULL DEFAULT 'todo',
priority TEXT NOT NULL DEFAULT 'medium',
due_date TEXT,
attachment TEXT,
created_at REAL NOT NULL,
updated_at REAL
);
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER NOT NULL REFERENCES tasks(id),
user_id INTEGER NOT NULL REFERENCES users(id),
body TEXT NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id),
type TEXT NOT NULL,
recipient TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT,
status TEXT NOT NULL DEFAULT 'pending',
error TEXT,
created_at REAL NOT NULL
);
"""
Database Class
class Database:
"""SQLite wrapper used as a scoped dependency (one per request)."""
def __init__(self, db_path: str = None):
self.db_path = db_path or DB_PATH
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA foreign_keys=ON")
def execute(self, sql: str, params: tuple = ()) -> sqlite3.Cursor:
return self.conn.execute(sql, params)
def fetchone(self, sql: str, params: tuple = ()) -> dict | None:
row = self.conn.execute(sql, params).fetchone()
return dict(row) if row else None
def fetchall(self, sql: str, params: tuple = ()) -> list[dict]:
rows = self.conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def insert(self, table: str, data: dict) -> int:
data['created_at'] = time.time()
cols = ', '.join(data.keys())
placeholders = ', '.join('?' for _ in data)
cur = self.conn.execute(
f"INSERT INTO {table} ({cols}) VALUES ({placeholders})",
tuple(data.values())
)
self.conn.commit()
return cur.lastrowid
def update(self, table: str, row_id: int, data: dict) -> bool:
data['updated_at'] = time.time()
sets = ', '.join(f"{k} = ?" for k in data)
self.conn.execute(
f"UPDATE {table} SET {sets} WHERE id = ?",
(*data.values(), row_id)
)
self.conn.commit()
return True
def delete(self, table: str, row_id: int) -> bool:
cur = self.conn.execute(f"DELETE FROM {table} WHERE id = ?", (row_id,))
self.conn.commit()
return cur.rowcount > 0
def count(self, table: str) -> int:
row = self.conn.execute(f"SELECT COUNT(*) as cnt FROM {table}").fetchone()
return row['cnt']
def close(self):
self.conn.close()
Seed Data
def init_db(db_path: str = None):
"""Initialize the database schema and seed demo data."""
path = db_path or DB_PATH
conn = sqlite3.connect(path)
conn.executescript(SCHEMA)
count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if count == 0:
now = time.time()
# Admin user (password: admin123)
conn.execute(
"INSERT INTO users (username, email, password_hash, role, created_at) "
"VALUES (?, ?, ?, ?, ?)",
('admin', '[email protected]', hash_password('admin123'), 'admin', now)
)
# Regular users
conn.execute(
"INSERT INTO users (username, email, password_hash, role, created_at) "
"VALUES (?, ?, ?, ?, ?)",
('alice', '[email protected]', hash_password('alice123'), 'member', now)
)
conn.execute(
"INSERT INTO users (username, email, password_hash, role, created_at) "
"VALUES (?, ?, ?, ?, ?)",
('bob', '[email protected]', hash_password('bob123'), 'member', now)
)
# Demo projects and tasks...
conn.commit()
conn.close()
Authentication
TaskFlow implements a complete authentication and authorization system: HMAC-SHA256 signed tokens, PBKDF2-SHA256 password hashing via Lcore's built-in hash_password() / verify_password(), a custom TokenAuthMiddleware, an AdminGuardHook for route-level RBAC, endpoint-level _require_admin() guards on write operations (create/update/delete projects and users), HTTP Basic Auth via @auth_basic, and signed session cookies.
Token Generation & Verification (auth.py)
import hashlib
import hmac
import time
import json
import base64
def generate_token(user_id: int, username: str, role: str) -> str:
"""Generate a signed auth token (base64 JSON + HMAC-SHA256)."""
payload = {
'sub': user_id,
'username': username,
'role': role,
'iat': int(time.time()),
'exp': int(time.time()) + 3600,
}
payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload).encode()
).decode()
signature = hmac.new(
_get_secret().encode(), payload_b64.encode(), hashlib.sha256
).hexdigest()
return f"{payload_b64}.{signature}"
def verify_token(token: str) -> dict | None:
"""Verify and decode a signed auth token."""
try:
payload_b64, signature = token.rsplit('.', 1)
expected_sig = hmac.new(
_get_secret().encode(), payload_b64.encode(), hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_sig):
return None
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
if payload.get('exp', 0) < time.time():
return None
return payload
except Exception:
return None
TokenAuthMiddleware
class TokenAuthMiddleware(Middleware):
"""Validates Bearer tokens on protected API routes."""
name = 'token_auth'
order = 6
def __init__(self, skip_paths=None):
self.skip_paths = skip_paths or []
def __call__(self, ctx, next_handler):
path = ctx.request.path
for skip in self.skip_paths:
if skip == path:
return next_handler(ctx)
if len(skip) > 1 and skip.endswith('/') and path.startswith(skip):
return next_handler(ctx)
auth_header = ctx.request.get_header('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
ctx.response.status = 401
return {'error': 'Missing or invalid Authorization header'}
payload = verify_token(auth_header[7:])
if not payload:
ctx.response.status = 401
return {'error': 'Invalid or expired token'}
ctx.user = {
'id': payload['sub'],
'username': payload['username'],
'role': payload['role'],
}
return next_handler(ctx)
AdminGuardHook (RBAC)
class AdminGuardHook(MiddlewareHook):
"""Restricts /admin/* to admin users."""
name = 'admin_guard'
order = 7
def pre(self, ctx):
if not ctx.request.path.startswith('/admin'):
return None
user = getattr(ctx, 'user', None)
if not user or user.get('role') != 'admin':
return HTTPResponse(
body=json.dumps({'error': 'Admin access required'}),
status=403,
headers={'Content-Type': 'application/json'}
)
return None
def post(self, ctx, result):
return result
Endpoint-Level RBAC
Beyond the route-level AdminGuardHook, TaskFlow enforces role-based access directly in endpoint handlers. Write operations on projects and users are restricted to admin users, while members can update their own profile:
# Helper used in users.py and products.py
def _require_admin():
"""Abort 403 if current user is not an admin."""
user = getattr(ctx, 'user', None)
if not user or user.get('role') != 'admin':
abort(403, 'Admin access required')
# Admin-only: create, patch, delete users
@users_app.post('/', name='create_user')
def create_user():
_require_admin()
# ...
# Admin or self: update own profile
@users_app.put('/<id:int>', name='update_user')
def update_user(id):
user = getattr(ctx, 'user', None)
if not user or (user.get('role') != 'admin' and user.get('id') != id):
abort(403, 'Admin access required or can only update own profile')
# ...
# Admin-only: create, update, delete projects
@projects_app.post('/', name='create_project')
def create_project():
_require_admin()
# ...
| Endpoint | Access Rule |
|---|---|
POST /api/users/ | Admin only |
PUT /api/users/<id> | Admin or self (own profile) |
PATCH /api/users/<id> | Admin only |
DELETE /api/users/<id> | Admin only |
POST /api/projects/ | Admin only |
PUT /api/projects/<id> | Admin only |
DELETE /api/projects/<id> | Admin only |
GET endpoints, tasks, comments | Any authenticated user |
HTTP Basic Auth & Signed Cookies
# HTTP Basic Auth on a specific route
@auth_app.get('/basic-demo')
@auth_basic(_check_basic_auth, realm='TaskFlow')
def basic_auth_demo():
"""Protected by HTTP Basic Auth."""
return {'message': 'Authenticated via HTTP Basic Auth!'}
# Login sets a signed session cookie (HMAC-SHA256)
@auth_app.post('/login')
@rate_limit(5, per=300)
@validate_request(body={'username': str, 'password': str})
def login():
"""Authenticate and return a signed token + session cookie."""
data = request.json
db = Database()
try:
user = db.fetchone("SELECT * FROM users WHERE username = ?", (data['username'],))
if not user or not verify_password(data['password'], user['password_hash']):
abort(401, 'Invalid credentials')
token = generate_token(user['id'], user['username'], user['role'])
# Signed cookie for session-based auth
response.set_cookie('session_user', user['username'],
secret=_get_secret(), path='/',
httponly=True, samesite='Lax', max_age=3600)
return {
'token': token,
'user': {'id': user['id'], 'username': user['username'], 'role': user['role']}
}
finally:
db.close()
# Read signed cookie
@auth_app.get('/me')
def me():
"""Get current user from signed session cookie."""
username = request.get_cookie('session_user', secret=_get_secret())
if not username:
abort(401, 'Not authenticated')
# ...
Middleware Stack
TaskFlow uses all 7 built-in Lcore middleware plus 3 custom middleware, for a total of 10 active middleware ordered by priority.
| Order | Middleware | Type | Purpose |
|---|---|---|---|
| 0 | BodyLimitMiddleware | Built-in | Reject oversized request bodies |
| 1 | RequestIDMiddleware | Built-in | Generate/propagate X-Request-ID |
| 2 | RequestLoggerMiddleware | Built-in | Structured JSON request logging |
| 2 | TimingMiddleware | Custom | X-Response-Time header |
| 3 | CORSMiddleware | Built-in | Full CORS with preflight |
| 5 | SecurityHeadersMiddleware | Built-in | HSTS, XSS, clickjacking headers |
| 6 | TokenAuthMiddleware | Custom | Bearer token validation |
| 7 | AdminGuardHook | Custom | RBAC for /admin/* routes |
| 8 | AuditLogHook | Custom | Log state-changing requests |
| 90 | CompressionMiddleware | Built-in | Gzip compression |
Registration Code (app.py)
from lcore import (
Lcore,
CORSMiddleware, SecurityHeadersMiddleware, CSRFMiddleware,
RequestIDMiddleware, RequestLoggerMiddleware,
BodyLimitMiddleware, CompressionMiddleware,
)
from modules.plugins import TimingMiddleware, AuditLogHook
from modules.auth import TokenAuthMiddleware, AdminGuardHook
app = Lcore()
# Built-in middleware
app.use(BodyLimitMiddleware(
max_size=int(app.config.get('max_upload_size', 10_485_760))
))
app.use(RequestIDMiddleware())
app.use(RequestLoggerMiddleware(logger=logging.getLogger('http')))
# Custom timing middleware
app.use(TimingMiddleware())
# CORS with full configuration
app.use(CORSMiddleware(
allow_origins=cors_origins,
allow_methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
allow_headers=['Content-Type', 'Authorization', 'X-CSRF-Token', 'X-Requested-With'],
expose_headers=['X-Request-ID', 'X-Response-Time', 'X-API-Version'],
allow_credentials=True,
max_age=86400,
))
app.use(SecurityHeadersMiddleware(hsts=True, hsts_max_age=31536000))
# Custom auth middleware with path skipping
app.use(TokenAuthMiddleware(
skip_paths=['/auth/', '/health', '/docs', '/', '/static/', '/debug/', '/old-api']
))
app.use(AdminGuardHook())
app.use(AuditLogHook())
# Compression (high order = runs last)
app.use(CompressionMiddleware(min_size=256, level=6))
CSRFMiddleware is imported but not activated. It is designed for server-rendered form-based apps, not JSON APIs with Bearer token auth where cross-origin requests cannot set custom Authorization headers.
Custom Plugins
TaskFlow includes two custom plugins that demonstrate the setup(app) / apply(callback, route) plugin lifecycle.
APIVersionPlugin (plugins.py)
class APIVersionPlugin:
"""Adds API version info to every JSON response."""
name = 'api_version'
api = 2
def __init__(self, version='0.0.1'):
self.version = version
def setup(self, app):
"""Called once when the plugin is installed."""
self.app = app
print(f" [Plugin] APIVersionPlugin v{self.version} installed")
def apply(self, callback, route):
"""Called for each route. Wraps the callback."""
# Skip if route opts out
if route.config.get('skip_versioning'):
return callback
version = self.version
def wrapper(*args, **kwargs):
result = callback(*args, **kwargs)
if isinstance(result, dict):
result['_api_version'] = version
response.set_header('X-API-Version', version)
return result
return wrapper
RequestCounterPlugin (plugins.py)
class RequestCounterPlugin:
"""Tracks total request count per route."""
name = 'request_counter'
api = 2
def __init__(self):
self.counts = {}
def setup(self, app):
self.app = app
def apply(self, callback, route):
rule = route.rule
self.counts.setdefault(rule, 0)
counts = self.counts
def wrapper(*args, **kwargs):
counts[rule] = counts.get(rule, 0) + 1
result = callback(*args, **kwargs)
if isinstance(result, dict):
result['_request_number'] = counts[rule]
return result
return wrapper
def get_stats(self):
return dict(self.counts)
def close(self):
"""Called when plugin is uninstalled."""
print(f" [Plugin] RequestCounterPlugin closed. Final stats: {self.counts}")
Installing Plugins (app.py)
version_plugin = APIVersionPlugin(version='0.0.1')
app.install(version_plugin)
counter_plugin = RequestCounterPlugin()
app.install(counter_plugin)
Routes can opt out of specific plugins. The home page uses skip_versioning=True to prevent the version plugin from modifying its template response: @app.get('/', name='home', skip_versioning=True)
Module Mounting
TaskFlow is organized into four sub-applications, each defined as an independent Lcore() instance and mounted at a URL prefix. This demonstrates Lcore's modular architecture.
from modules.auth import auth_app
from modules.users import users_app
from modules.products import projects_app
from modules.notifications import notifications_app
app = Lcore()
# Mount sub-applications at URL prefixes
app.mount('/auth/', auth_app)
app.mount('/api/users/', users_app)
app.mount('/api/projects/', projects_app)
app.mount('/api/notifications/', notifications_app)
| Prefix | Sub-App | Source File | Purpose |
|---|---|---|---|
/auth/ | auth_app | modules/auth.py | Login, register, session, Basic Auth |
/api/users/ | users_app | modules/users.py | User CRUD with all HTTP methods |
/api/projects/ | projects_app | modules/products.py | Projects, tasks, comments, uploads |
/api/notifications/ | notifications_app | modules/notifications.py | SMTP email, batch notify |
Each sub-app is a fully independent Lcore() instance with its own routes, hooks, and middleware:
# modules/users.py
from lcore import Lcore, request
users_app = Lcore()
@users_app.hook('before_request')
def log_user_request():
print(f" [Users] {request.method} {request.path}")
@users_app.get('/', name='list_users')
def list_users():
"""List all team members."""
# ...
Dependency Injection
TaskFlow registers three dependencies with different lifetimes, demonstrating all three DI modes.
import uuid
import time
from models import Database
class AppCache:
"""In-memory cache (singleton lifetime)."""
def __init__(self):
self._store = {}
def get(self, key):
entry = self._store.get(key)
if entry and entry['expires'] > time.time():
return entry['value']
return None
def set(self, key, value, ttl=300):
self._store[key] = {'value': value, 'expires': time.time() + ttl}
def invalidate(self, key):
self._store.pop(key, None)
# Singleton: one instance for the entire app lifetime
app.inject('cache', AppCache, lifetime='singleton')
# Scoped: one instance per request, auto-closed via .close()
app.inject('db', lambda: Database(db_path), lifetime='scoped')
# Transient: new value every time it is accessed
app.inject('trace_id', lambda: str(uuid.uuid4()), lifetime='transient')
Access injected dependencies anywhere via ctx:
@app.get('/debug/di', name='debug_di')
def debug_di():
"""Dependency injection demo - shows different lifetimes."""
return {
'trace_id_1': ctx.trace_id,
'trace_id_2': ctx.trace_id, # Different each access (transient)
'cache_type': type(ctx.cache).__name__,
'db_type': type(ctx.db).__name__,
}
| Name | Lifetime | Type | Behavior |
|---|---|---|---|
ctx.cache | singleton | AppCache | Same instance for all requests |
ctx.db | scoped | Database | One per request, .close() called automatically |
ctx.trace_id | transient | str | New UUID on every access |
CRUD Operations
The Users module demonstrates all five HTTP method decorators with typed route parameters, rate limiting, request validation, pagination, and search. Write operations (POST, PUT, PATCH, DELETE) are protected by admin-only guards, except PUT which also allows users to update their own profile.
GET — List with Pagination
@users_app.get('/', name='list_users')
@rate_limit(100, per=60)
def list_users():
"""List all team members with optional role filter and pagination."""
db = _get_db()
try:
role = request.query.get('role')
page = int(request.query.get('page', '1'))
limit = int(request.query.get('limit', '20'))
offset = (page - 1) * limit
if role:
users = db.fetchall(
"SELECT id, username, email, role, is_active, created_at "
"FROM users WHERE role = ? LIMIT ? OFFSET ?",
(role, limit, offset)
)
total = db.fetchone("SELECT COUNT(*) as cnt FROM users WHERE role = ?", (role,))['cnt']
else:
users = db.fetchall(
"SELECT id, username, email, role, is_active, created_at "
"FROM users LIMIT ? OFFSET ?",
(limit, offset)
)
total = db.count('users')
return {'users': users, 'total': total, 'page': page, 'limit': limit}
finally:
db.close()
POST — Create
@users_app.post('/', name='create_user')
@rate_limit(20, per=60)
@validate_request(body={'username': str, 'email': str, 'password': str})
def create_user():
"""Invite a new team member."""
data = request.json
db = _get_db()
try:
if db.fetchone("SELECT id FROM users WHERE username = ?", (data['username'],)):
abort(409, 'Username already exists')
user_id = db.insert('users', {
'username': data['username'],
'email': data['email'],
'password_hash': hash_password(data['password']),
'role': data.get('role', 'member'),
'is_active': 1,
})
response.status = 201
return {'id': user_id, 'username': data['username'], 'created': True}
finally:
db.close()
GET — Read by ID
@users_app.get('/<id:int>', name='get_user')
@rate_limit(200, per=60)
def get_user(id):
"""Fetch a team member by ID."""
db = _get_db()
try:
user = db.fetchone(
"SELECT id, username, email, role, is_active, avatar, created_at "
"FROM users WHERE id = ?",
(id,)
)
if not user:
abort(404, 'User not found')
return user
finally:
db.close()
PUT — Full Update (Admin or Self)
@users_app.put('/<id:int>', name='update_user')
@rate_limit(50, per=60)
@validate_request(body={'username': str, 'email': str})
def update_user(id):
"""Full update of a user profile. Admin or self only."""
user = getattr(ctx, 'user', None)
if not user or (user.get('role') != 'admin' and user.get('id') != id):
abort(403, 'Admin access required or can only update own profile')
data = request.json
db = _get_db()
try:
if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
abort(404, 'User not found')
db.update('users', id, {'username': data['username'], 'email': data['email']})
return {'id': id, 'updated': True}
finally:
db.close()
PATCH — Partial Update
@users_app.patch('/<id:int>', name='patch_user')
@rate_limit(50, per=60)
def patch_user(id):
"""Partial update (change role, deactivate, etc.)."""
data = request.json
if not data:
abort(400, 'No data provided')
db = _get_db()
try:
if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
abort(404, 'User not found')
allowed = {'username', 'email', 'role', 'is_active'}
update_data = {k: v for k, v in data.items() if k in allowed}
if not update_data:
abort(400, 'No valid fields to update')
db.update('users', id, update_data)
return {'id': id, 'patched': True, 'fields': list(update_data.keys())}
finally:
db.close()
DELETE — Remove
@users_app.delete('/<id:int>', name='delete_user')
@rate_limit(10, per=60)
def delete_user(id):
"""Remove a team member."""
db = _get_db()
try:
if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
abort(404, 'User not found')
db.delete('users', id)
return {'id': id, 'deleted': True}
finally:
db.close()
File Uploads
Task attachments can be uploaded via multipart form data. Uploaded files are saved to disk and served via static_file().
Upload Handler (products.py)
@projects_app.post('/<project_id:int>/tasks/<task_id:int>/attachment', name='upload_attachment')
@rate_limit(10, per=60)
def upload_attachment(project_id, task_id):
"""Upload a file attachment to a task."""
db = _get_db()
try:
if not db.fetchone("SELECT id FROM tasks WHERE id = ? AND project_id = ?",
(task_id, project_id)):
abort(404, 'Task not found')
upload = request.files.get('file')
if not upload:
abort(400, 'No file provided (field name: "file")')
upload_dir = os.path.join(os.path.dirname(__file__), '..', 'uploads')
os.makedirs(upload_dir, exist_ok=True)
safe_name = f"task_{task_id}_{upload.filename}"
upload.save(os.path.join(upload_dir, safe_name), overwrite=True)
db.update('tasks', task_id, {'attachment': safe_name})
return {
'task_id': task_id,
'filename': safe_name,
'content_type': upload.content_type,
'uploaded': True,
}
finally:
db.close()
Serving Attachments
@projects_app.get('/attachments/<filename:path>', name='serve_attachment')
def serve_attachment(filename):
"""Serve uploaded task attachments. Demonstrates static_file()."""
upload_dir = os.path.join(os.path.dirname(__file__), '..', 'uploads')
return static_file(filename, root=upload_dir,
headers={'Cache-Control': 'max-age=3600'})
# Upload a file
curl -X POST -F "[email protected]" \
http://localhost:8080/api/projects/1/tasks/1/attachment \
-H "Authorization: Bearer <token>"
# Download it
curl http://localhost:8080/api/projects/attachments/task_1_report.pdf \
-H "Authorization: Bearer <token>"
Async Routes
Lcore supports async def route handlers. TaskFlow uses asyncio.gather in the dashboard endpoint to run multiple database queries concurrently.
import asyncio
@projects_app.get('/dashboard/overview')
async def dashboard_overview():
"""Project dashboard with aggregated stats."""
db = _get_db()
try:
async def get_project_stats():
await asyncio.sleep(0.01)
return db.fetchall(
"""SELECT p.id, p.name, p.status,
COUNT(t.id) as task_count,
SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) as done_count
FROM projects p LEFT JOIN tasks t ON p.id = t.project_id
GROUP BY p.id"""
)
async def get_overdue_tasks():
await asyncio.sleep(0.01)
return db.fetchall(
"""SELECT t.id, t.title, t.due_date, p.name as project_name
FROM tasks t JOIN projects p ON t.project_id = p.id
WHERE t.due_date < date('now') AND t.status != 'done'"""
)
projects, overdue = await asyncio.gather(
get_project_stats(),
get_overdue_tasks()
)
total_tasks = db.count('tasks')
total_users = db.count('users')
return {
'projects': projects,
'overdue_tasks': overdue,
'totals': {
'projects': len(projects),
'tasks': total_tasks,
'users': total_users,
'overdue': len(overdue),
}
}
finally:
db.close()
SMTP Email
The notifications module includes a full EmailService that sends emails via Python's smtplib. When SMTP credentials are not configured, it runs in simulated mode, logging emails to the database without sending them.
EmailService (notifications.py)
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailService:
"""SMTP email sender. Simulates when credentials aren't configured."""
def __init__(self):
self.host = os.environ.get('APP_SMTP_HOST', 'smtp.gmail.com')
self.port = int(os.environ.get('APP_SMTP_PORT', '587'))
self.user = os.environ.get('APP_SMTP_USER', '')
self.password = os.environ.get('APP_SMTP_PASSWORD', '')
self.from_addr = os.environ.get('APP_SMTP_FROM', '[email protected]')
self.use_tls = os.environ.get('APP_SMTP_USE_TLS', 'true').lower() == 'true'
def send(self, to: str, subject: str, body: str, html: bool = False) -> dict:
msg = MIMEMultipart('alternative')
msg['From'] = self.from_addr
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html' if html else 'plain'))
db = Database()
try:
if not self.user or self.user == '[email protected]':
# Simulate - no real SMTP configured
db.insert('notifications', {
'type': 'email', 'recipient': to, 'subject': subject,
'body': body, 'status': 'simulated',
})
return {
'status': 'simulated', 'to': to, 'subject': subject,
'note': 'SMTP not configured - email simulated.',
}
# Real SMTP
server = smtplib.SMTP(self.host, self.port, timeout=10)
if self.use_tls:
server.starttls()
server.login(self.user, self.password)
server.sendmail(self.from_addr, [to], msg.as_string())
server.quit()
db.insert('notifications', {
'type': 'email', 'recipient': to, 'subject': subject,
'body': body, 'status': 'sent',
})
return {'status': 'sent', 'to': to, 'subject': subject}
except Exception as e:
db.insert('notifications', {
'type': 'email', 'recipient': to, 'subject': subject,
'status': 'failed', 'error': str(e),
})
return {'status': 'failed', 'to': to, 'error': str(e)}
finally:
db.close()
Usage in Routes
@notifications_app.post('/email', name='send_email')
@rate_limit(10, per=60)
@validate_request(body={'to': str, 'subject': str, 'body': str})
def send_email():
"""Send an email notification via SMTP."""
data = request.json
svc = EmailService()
return svc.send(data['to'], data['subject'], data['body'], data.get('html', False))
@notifications_app.post('/task-assigned/<task_id:int>', name='notify_assignment')
@rate_limit(20, per=60)
def notify_assignment(task_id):
"""Notify a user they've been assigned a task."""
db = Database()
try:
task = db.fetchone(
"""SELECT t.*, u.email as assignee_email, u.username as assignee_name,
p.name as project_name
FROM tasks t
JOIN users u ON t.assignee_id = u.id
JOIN projects p ON t.project_id = p.id
WHERE t.id = ?""",
(task_id,)
)
if not task:
abort(404, 'Task not found or has no assignee')
svc = EmailService()
return svc.send(
to=task['assignee_email'],
subject=f"[TaskFlow] You've been assigned: {task['title']}",
body=f"<h2>New Task Assignment</h2>...",
html=True,
)
finally:
db.close()
Templates
Lcore's .tpl files use SimpleTemplate, the built-in template engine. SimpleTemplate files are standard HTML with embedded Python expressions and control flow. There is no special compilation step — they are rendered at request time via template('name', key=value).
Syntax Reference
| Syntax | Purpose | Example |
|---|---|---|
{{variable}} | Output (HTML-escaped) | {{title}} |
{{!variable}} | Output (raw/unescaped) | {{!raw_html}} |
% for x in y: | Loop | % for item in items: |
% if cond: | Conditional | % if user: |
% end | End block | % end |
% include('x') | Include another template | % include('footer') |
Welcome Page Template (welcome.tpl)
<!DOCTYPE html>
<html>
<head>
<title>{{title}}</title>
<style>
/* ... CSS styles ... */
</style>
</head>
<body>
<div class="container">
<h1>{{title}}</h1>
<p class="subtitle">{{description}}</p>
<div class="features">
% for feature in features:
<div class="feature">
<h3>{{feature['name']}}</h3>
<p>{{feature['desc']}}</p>
</div>
% end
</div>
<div class="endpoints">
% for group in endpoint_groups:
<div class="endpoint-group">
<div class="group-title">{{group['name']}}</div>
% for ep in group['endpoints']:
<div class="endpoint">
<span class="method {{ep['method'].lower()}}">{{ep['method']}}</span>
<span class="path">{{ep['path']}}</span>
<span class="desc">{{ep['desc']}}</span>
</div>
% end
</div>
% end
</div>
<div class="footer">
Lcore Framework v{{version}} · Running on {{host}}:{{port}}
</div>
</div>
</body>
</html>
Rendering from a Route (app.py)
from lcore import template, TEMPLATE_PATH
# Register template directory
TEMPLATE_PATH.insert(0, os.path.join(os.path.dirname(__file__), 'templates'))
@app.get('/', name='home', skip_versioning=True)
def home():
"""TaskFlow welcome page with API overview."""
return template('welcome',
title='TaskFlow API',
description='Project management API built with the Lcore framework',
version='0.0.1',
features=[
{'name': 'SQLite DB', 'desc': 'Real database with projects, tasks, users, comments'},
{'name': '.env Config', 'desc': 'Multi-source config loading + dataclass validation'},
# ...
],
endpoint_groups=[
{'name': 'Auth', 'endpoints': [
{'method': 'POST', 'path': '/auth/login', 'desc': 'Login with credentials'},
# ...
]},
],
)
Lifecycle Hooks
TaskFlow uses five lifecycle hooks to inject behavior at different points in the request lifecycle.
| Hook | Where Used | Purpose |
|---|---|---|
on_request_start | app.py | Record request start time in ctx.state |
before_request | app.py, users.py | Log incoming request method and path |
after_request | app.py | Add X-Powered-By: Lcore header |
on_response_send | app.py | Log response status code and duration |
on_module_mount | app.py | Log when sub-applications are mounted |
@app.hook('on_request_start')
def on_request_start():
ctx.state['start_time'] = time.time()
@app.hook('before_request')
def before_request():
logger.info(f"-> {request.method} {request.path}")
@app.hook('after_request')
def after_request():
response.set_header('X-Powered-By', 'Lcore')
@app.hook('on_response_send')
def on_response_send():
start = ctx.state.get('start_time', 0)
duration = (time.time() - start) * 1000
logger.info(f"<- {request.method} {request.path} [{response.status_code}] {duration:.1f}ms")
@app.hook('on_module_mount')
def on_module_mount(prefix, child):
logger.info(f" Mounted module at {prefix}")
The before_request hook is also used at the module level in users.py:
# modules/users.py
@users_app.hook('before_request')
def log_user_request():
print(f" [Users] {request.method} {request.path}")
Error Handling
TaskFlow registers custom error handlers for HTTP 404, 500, 429, and 403. The 404 and 500 handlers perform content negotiation: they return JSON if the client sends Accept: application/json, or render an HTML template otherwise.
@app.error(404)
def error_404(error):
if 'application/json' in (request.get_header('Accept') or ''):
return {'error': 'Not Found', 'path': request.path, 'status': 404}
return template('error', status_code=404, status_text='Not Found',
message=f"The path '{request.path}' does not exist.")
@app.error(500)
def error_500(error):
if 'application/json' in (request.get_header('Accept') or ''):
return {'error': 'Internal Server Error', 'status': 500}
return template('error', status_code=500, status_text='Internal Server Error',
message='Something went wrong. Please try again later.')
@app.error(429)
def error_429(error):
return {'error': 'Too Many Requests', 'status': 429,
'message': 'Rate limit exceeded. Please slow down.'}
@app.error(403)
def error_403(error):
return {'error': 'Forbidden', 'status': 403,
'message': 'You do not have permission to access this resource.'}
Error Template (error.tpl)
<!DOCTYPE html>
<html>
<head>
<title>Error {{status_code}}</title>
</head>
<body>
<div class="card">
<h1>{{status_code}}</h1>
<h2>{{status_text}}</h2>
<p>{{message}}</p>
<p><a href="/">Back to Home</a></p>
</div>
</body>
</html>
Auto API Docs
When running in debug mode, TaskFlow enables Lcore's built-in API documentation at /docs. This auto-generates an interactive page listing all registered routes with their methods, parameters, and docstrings.
is_debug = str(app.config.get('debug', 'false')).lower() in ('true', '1', 'yes')
if is_debug:
app.enable_docs()
logger.info(" API docs at /docs")
This creates two endpoints:
| Path | Description |
|---|---|
/docs | Interactive HTML documentation page |
/docs/json | Raw JSON API schema |
API docs are only enabled when debug=true in the configuration. In production, they are disabled by default to avoid exposing your route structure publicly.
Frontend Application
TaskFlow includes a complete single-page frontend at /frontend, built entirely with Lcore's SimpleTemplate engine. No build tools, no npm, no JavaScript frameworks — just HTML, CSS, and vanilla JavaScript served directly by the backend as a .tpl template.
Pages
| Page | Description |
|---|---|
| Login | Demo credential quick-fill, token stored in localStorage |
| Dashboard | Stats overview (projects, tasks, team, overdue), project summary table, recent activity feed |
| Projects | List all projects, create/delete (admin only) |
| Project Detail | Kanban task board (To Do / In Progress / Done), create new tasks with assignee, priority, due date |
| Task Detail | Full task view, status change buttons, comments thread, delete (admin only) |
| Team | Member list with role badges, add/remove members (admin only) |
| Notifications | Email history log, compose and send emails |
| My Profile | View and edit own username and email, account info and permissions summary |
Role-Based UI
The frontend checks state.user.role to conditionally show admin-only UI elements:
function isAdmin() {
return state.user && state.user.role === 'admin';
}
// Only admins see the create/delete buttons
document.getElementById('topbar-actions').innerHTML =
isAdmin() ? '<button onclick="openNewProjectModal()">+ New Project</button>' : '';
// Members see projects and tasks, but no destructive actions
if (isAdmin()) html += '<button onclick="deleteProject(id)">Delete</button>';
Backend enforcement matches the UI: even if a member bypasses the frontend and calls the API directly, _require_admin() returns 403 Forbidden. The profile update (PUT /api/users/<id>) allows admin or the user updating their own account.
How It Works
The frontend is a single frontend.tpl file served by a Lcore route:
@app.get('/frontend', name='frontend', skip_versioning=True)
def frontend():
"""TaskFlow frontend — full project management UI."""
return template('frontend')
It uses fetch() to call the same API endpoints, with the Bearer token stored in localStorage for persistent login across page refreshes.
Test Results
TaskFlow includes a comprehensive test suite that validates all 30 endpoints. Every test passes.
PASS: Unauthenticated = 401
PASS: Login
PASS: Register
PASS: Basic Auth
PASS: Health
PASS: List users
PASS: Get user
PASS: Create user
PASS: Patch user
PASS: Search users
PASS: List projects
PASS: Get project
PASS: Create project
PASS: List tasks
PASS: Create task
PASS: Update task
PASS: Get task
PASS: Add comment
PASS: Send email
PASS: Batch notify
PASS: History
PASS: Dashboard (async)
PASS: Activity
PASS: Admin (admin)
PASS: Admin (member=403)
PASS: Routes
PASS: Middleware
PASS: Config
PASS: Stats
PASS: Redirect 301
30 passed, 0 failed
Test Coverage by Feature
| Category | Tests | What is Verified |
|---|---|---|
| Authentication | 4 | 401 without token, login, register, Basic Auth |
| System | 1 | Health check endpoint |
| Users CRUD | 5 | List, get, create, patch, search |
| Projects & Tasks | 8 | List/get/create projects, list/create/update/get tasks, add comment |
| Notifications | 3 | Send email, batch notify (async), history |
| Dashboard | 2 | Dashboard overview (async), activity feed |
| Admin & RBAC | 2 | Admin access with admin token, 403 with member token. Endpoint-level guards on user/project write operations |
| Debug | 4 | Routes, middleware, config (redacted), stats |
| Redirect | 1 | 301 redirect from /old-api to /api/projects/ |
WebSocket Note
Lcore does not natively support WebSockets. Lcore is a WSGI (Web Server Gateway Interface) framework, and WSGI is fundamentally a synchronous request/response protocol. Each WSGI request follows a strict cycle: the server receives a request, passes it to the application, the application returns a response, and the connection closes.
WebSockets require persistent, bidirectional connections where both the client and server can send messages at any time. This model is incompatible with WSGI's one-request-one-response architecture. WSGI has no mechanism for keeping a connection open or pushing data to the client outside of a response.
A companion WebSocket library designed to work alongside the Lcore framework is currently in development and will be released in a later phase. This library will allow you to run a WebSocket server side-by-side with your Lcore application, sharing configuration, authentication, and application logic seamlessly.
In the meantime, if your application needs WebSocket support, you have two options:
| Approach | Description |
|---|---|
| Separate WebSocket server | Run a dedicated WebSocket server (e.g., websockets, socket.io) alongside Lcore on a different port |
| ASGI framework | Use an ASGI-based framework (e.g., Starlette, FastAPI) that natively supports both HTTP and WebSocket protocols |
Lcore is intentionally WSGI-based for simplicity, broad server compatibility (Gunicorn, Waitress, uWSGI, etc.), and a straightforward programming model. WSGI remains the most widely deployed Python web standard. For the vast majority of REST APIs, WSGI is more than sufficient.
Lcore