Real-World Example — TaskFlow API

A complete project management API built with Lcore, demonstrating every framework feature with 30/30 passing tests.

Overview

TaskFlow is a full-featured project management API (similar to Trello or Jira) that serves as a comprehensive test of every Lcore framework feature. It includes a real SQLite database, multi-source configuration, authentication with signed tokens and cookies, endpoint-level RBAC (admin-only guards on write operations), 10 middleware, custom plugins, four mounted sub-applications, file uploads, SMTP email, async routes, templates, lifecycle hooks, custom error handlers, and a complete single-page frontend served via SimpleTemplate.

Project Structure

backend/
  app.py                  # Main entry point — wires everything together
  config.py               # Config loading + dataclass validation
  models.py               # SQLite database layer (schema + seed data)
  .env                    # Environment variables
  modules/
    __init__.py
    auth.py               # Token auth, Basic Auth, signed cookies, RBAC
    users.py              # Users CRUD (all HTTP methods, admin-only guards)
    products.py           # Projects + Tasks + Comments + File uploads (admin-only guards)
    notifications.py      # SMTP email sending + async batch
    plugins.py            # Custom plugins + custom middleware
  templates/
    welcome.tpl           # API overview + interactive playground
    frontend.tpl          # Full project management frontend (SPA)
    error.tpl             # Error page template
  static/                 # Static file directory
  uploads/                # File upload directory

Quick Start

cd backend
python app.py

The server starts at http://localhost:8080 with a seeded SQLite database containing 3 users, 2 projects, and 3 tasks.

UsernamePasswordRole
adminadmin123admin
alicealice123member
bobbob123member
# Login and get a token
curl -X POST http://localhost:8080/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"admin123"}'

# Use the returned token for authenticated requests
curl http://localhost:8080/api/users/ \
  -H "Authorization: Bearer <token>"

Configuration

TaskFlow demonstrates Lcore's 3-source configuration loading with dataclass validation. Configuration is loaded in priority order: defaults from a dict, overrides from a .env file, and final overrides from real environment variables.

Config Loading (config.py)

from dataclasses import dataclass
from lcore import Lcore


@dataclass
class AppConfigSchema:
    """Dataclass schema for config validation."""
    debug: bool = False
    secret_key: str = 'change-me'
    host: str = '0.0.0.0'
    port: int = 8080
    db_path: str = './taskflow.db'
    cors_origins: str = '*'
    smtp_host: str = 'smtp.gmail.com'
    smtp_port: int = 587
    smtp_user: str = ''
    smtp_password: str = ''
    smtp_from: str = '[email protected]'
    smtp_use_tls: bool = True
    rate_limit_default: int = 100
    rate_limit_window: int = 60
    max_upload_size: int = 10_485_760
    upload_dir: str = './uploads'


def configure_app(app: Lcore) -> None:
    """Load configuration from multiple sources and validate."""

    # 1. Load defaults via dict
    app.config.load_dict({
        'debug': False,
        'secret_key': 'change-me-in-production',
        'host': '0.0.0.0',
        'port': 8080,
        'db_path': './taskflow.db',
        'cors_origins': '*',
        'smtp_host': 'smtp.gmail.com',
        'smtp_port': 587,
        'smtp_user': '',
        'smtp_password': '',
        'smtp_from': '[email protected]',
        'smtp_use_tls': True,
        'rate_limit_default': 100,
        'rate_limit_window': 60,
        'max_upload_size': 10_485_760,
        'upload_dir': './uploads',
    })

    # 2. Load from .env file (overrides defaults)
    env_path = os.path.join(os.path.dirname(__file__), '.env')
    if os.path.exists(env_path):
        app.config.load_dotenv(env_path)

    # 3. Load from environment variables with APP_ prefix (overrides .env)
    app.config.load_env('APP_', strip_prefix=True)

    # 4. Validate config against schema
    try:
        app.config.validate_config(AppConfigSchema)
        print("[CONFIG] Configuration validated successfully")
    except ValueError as e:
        print(f"[CONFIG WARNING] {e}")
Priority Order

Each source overrides the previous: load_dict() sets defaults, load_dotenv() overrides from .env, and load_env('APP_') overrides from real environment variables. The final validate_config() ensures all values match the dataclass types.

.env File Format

# TaskFlow API Configuration
APP_NAME=TaskFlow API
APP_DEBUG=true
APP_SECRET_KEY=super-secret-key-change-in-production
APP_HOST=0.0.0.0
APP_PORT=8080

# Database
APP_DB_PATH=./taskflow.db

# CORS
APP_CORS_ORIGINS=http://localhost:3000,http://localhost:5173

# SMTP / Email
APP_SMTP_HOST=smtp.gmail.com
APP_SMTP_PORT=587
[email protected]
APP_SMTP_PASSWORD=your-app-password
[email protected]
APP_SMTP_USE_TLS=true

# Rate Limiting
APP_RATE_LIMIT_DEFAULT=100
APP_RATE_LIMIT_WINDOW=60

# Upload
APP_MAX_UPLOAD_SIZE=10485760
APP_UPLOAD_DIR=./uploads

Database & Models

TaskFlow uses SQLite with a real relational schema. The Database class wraps sqlite3 and is used as a scoped dependency (one connection per request, auto-closed).

Schema (models.py)

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    username TEXT UNIQUE NOT NULL,
    email TEXT UNIQUE NOT NULL,
    password_hash TEXT NOT NULL,
    role TEXT NOT NULL DEFAULT 'member',
    avatar TEXT,
    is_active INTEGER NOT NULL DEFAULT 1,
    created_at REAL NOT NULL,
    updated_at REAL
);

CREATE TABLE IF NOT EXISTS projects (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    description TEXT DEFAULT '',
    owner_id INTEGER NOT NULL REFERENCES users(id),
    status TEXT NOT NULL DEFAULT 'active',
    created_at REAL NOT NULL,
    updated_at REAL
);

CREATE TABLE IF NOT EXISTS tasks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    description TEXT DEFAULT '',
    project_id INTEGER NOT NULL REFERENCES projects(id),
    assignee_id INTEGER REFERENCES users(id),
    creator_id INTEGER NOT NULL REFERENCES users(id),
    status TEXT NOT NULL DEFAULT 'todo',
    priority TEXT NOT NULL DEFAULT 'medium',
    due_date TEXT,
    attachment TEXT,
    created_at REAL NOT NULL,
    updated_at REAL
);

CREATE TABLE IF NOT EXISTS comments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id INTEGER NOT NULL REFERENCES tasks(id),
    user_id INTEGER NOT NULL REFERENCES users(id),
    body TEXT NOT NULL,
    created_at REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS notifications (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER REFERENCES users(id),
    type TEXT NOT NULL,
    recipient TEXT NOT NULL,
    subject TEXT NOT NULL,
    body TEXT,
    status TEXT NOT NULL DEFAULT 'pending',
    error TEXT,
    created_at REAL NOT NULL
);
"""

Database Class

class Database:
    """SQLite wrapper used as a scoped dependency (one per request)."""

    def __init__(self, db_path: str = None):
        self.db_path = db_path or DB_PATH
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        self.conn.execute("PRAGMA journal_mode=WAL")
        self.conn.execute("PRAGMA foreign_keys=ON")

    def execute(self, sql: str, params: tuple = ()) -> sqlite3.Cursor:
        return self.conn.execute(sql, params)

    def fetchone(self, sql: str, params: tuple = ()) -> dict | None:
        row = self.conn.execute(sql, params).fetchone()
        return dict(row) if row else None

    def fetchall(self, sql: str, params: tuple = ()) -> list[dict]:
        rows = self.conn.execute(sql, params).fetchall()
        return [dict(r) for r in rows]

    def insert(self, table: str, data: dict) -> int:
        data['created_at'] = time.time()
        cols = ', '.join(data.keys())
        placeholders = ', '.join('?' for _ in data)
        cur = self.conn.execute(
            f"INSERT INTO {table} ({cols}) VALUES ({placeholders})",
            tuple(data.values())
        )
        self.conn.commit()
        return cur.lastrowid

    def update(self, table: str, row_id: int, data: dict) -> bool:
        data['updated_at'] = time.time()
        sets = ', '.join(f"{k} = ?" for k in data)
        self.conn.execute(
            f"UPDATE {table} SET {sets} WHERE id = ?",
            (*data.values(), row_id)
        )
        self.conn.commit()
        return True

    def delete(self, table: str, row_id: int) -> bool:
        cur = self.conn.execute(f"DELETE FROM {table} WHERE id = ?", (row_id,))
        self.conn.commit()
        return cur.rowcount > 0

    def count(self, table: str) -> int:
        row = self.conn.execute(f"SELECT COUNT(*) as cnt FROM {table}").fetchone()
        return row['cnt']

    def close(self):
        self.conn.close()

Seed Data

def init_db(db_path: str = None):
    """Initialize the database schema and seed demo data."""
    path = db_path or DB_PATH
    conn = sqlite3.connect(path)
    conn.executescript(SCHEMA)

    count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
    if count == 0:
        now = time.time()

        # Admin user (password: admin123)
        conn.execute(
            "INSERT INTO users (username, email, password_hash, role, created_at) "
            "VALUES (?, ?, ?, ?, ?)",
            ('admin', '[email protected]', hash_password('admin123'), 'admin', now)
        )
        # Regular users
        conn.execute(
            "INSERT INTO users (username, email, password_hash, role, created_at) "
            "VALUES (?, ?, ?, ?, ?)",
            ('alice', '[email protected]', hash_password('alice123'), 'member', now)
        )
        conn.execute(
            "INSERT INTO users (username, email, password_hash, role, created_at) "
            "VALUES (?, ?, ?, ?, ?)",
            ('bob', '[email protected]', hash_password('bob123'), 'member', now)
        )

        # Demo projects and tasks...
        conn.commit()
    conn.close()

Authentication

TaskFlow implements a complete authentication and authorization system: HMAC-SHA256 signed tokens, PBKDF2-SHA256 password hashing via Lcore's built-in hash_password() / verify_password(), a custom TokenAuthMiddleware, an AdminGuardHook for route-level RBAC, endpoint-level _require_admin() guards on write operations (create/update/delete projects and users), HTTP Basic Auth via @auth_basic, and signed session cookies.

Token Generation & Verification (auth.py)

import hashlib
import hmac
import time
import json
import base64


def generate_token(user_id: int, username: str, role: str) -> str:
    """Generate a signed auth token (base64 JSON + HMAC-SHA256)."""
    payload = {
        'sub': user_id,
        'username': username,
        'role': role,
        'iat': int(time.time()),
        'exp': int(time.time()) + 3600,
    }
    payload_b64 = base64.urlsafe_b64encode(
        json.dumps(payload).encode()
    ).decode()
    signature = hmac.new(
        _get_secret().encode(), payload_b64.encode(), hashlib.sha256
    ).hexdigest()
    return f"{payload_b64}.{signature}"


def verify_token(token: str) -> dict | None:
    """Verify and decode a signed auth token."""
    try:
        payload_b64, signature = token.rsplit('.', 1)
        expected_sig = hmac.new(
            _get_secret().encode(), payload_b64.encode(), hashlib.sha256
        ).hexdigest()
        if not hmac.compare_digest(signature, expected_sig):
            return None
        payload = json.loads(base64.urlsafe_b64decode(payload_b64))
        if payload.get('exp', 0) < time.time():
            return None
        return payload
    except Exception:
        return None

TokenAuthMiddleware

class TokenAuthMiddleware(Middleware):
    """Validates Bearer tokens on protected API routes."""
    name = 'token_auth'
    order = 6

    def __init__(self, skip_paths=None):
        self.skip_paths = skip_paths or []

    def __call__(self, ctx, next_handler):
        path = ctx.request.path
        for skip in self.skip_paths:
            if skip == path:
                return next_handler(ctx)
            if len(skip) > 1 and skip.endswith('/') and path.startswith(skip):
                return next_handler(ctx)

        auth_header = ctx.request.get_header('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            ctx.response.status = 401
            return {'error': 'Missing or invalid Authorization header'}

        payload = verify_token(auth_header[7:])
        if not payload:
            ctx.response.status = 401
            return {'error': 'Invalid or expired token'}

        ctx.user = {
            'id': payload['sub'],
            'username': payload['username'],
            'role': payload['role'],
        }
        return next_handler(ctx)

AdminGuardHook (RBAC)

class AdminGuardHook(MiddlewareHook):
    """Restricts /admin/* to admin users."""
    name = 'admin_guard'
    order = 7

    def pre(self, ctx):
        if not ctx.request.path.startswith('/admin'):
            return None
        user = getattr(ctx, 'user', None)
        if not user or user.get('role') != 'admin':
            return HTTPResponse(
                body=json.dumps({'error': 'Admin access required'}),
                status=403,
                headers={'Content-Type': 'application/json'}
            )
        return None

    def post(self, ctx, result):
        return result

Endpoint-Level RBAC

Beyond the route-level AdminGuardHook, TaskFlow enforces role-based access directly in endpoint handlers. Write operations on projects and users are restricted to admin users, while members can update their own profile:

# Helper used in users.py and products.py
def _require_admin():
    """Abort 403 if current user is not an admin."""
    user = getattr(ctx, 'user', None)
    if not user or user.get('role') != 'admin':
        abort(403, 'Admin access required')


# Admin-only: create, patch, delete users
@users_app.post('/', name='create_user')
def create_user():
    _require_admin()
    # ...

# Admin or self: update own profile
@users_app.put('/<id:int>', name='update_user')
def update_user(id):
    user = getattr(ctx, 'user', None)
    if not user or (user.get('role') != 'admin' and user.get('id') != id):
        abort(403, 'Admin access required or can only update own profile')
    # ...

# Admin-only: create, update, delete projects
@projects_app.post('/', name='create_project')
def create_project():
    _require_admin()
    # ...
EndpointAccess Rule
POST /api/users/Admin only
PUT /api/users/<id>Admin or self (own profile)
PATCH /api/users/<id>Admin only
DELETE /api/users/<id>Admin only
POST /api/projects/Admin only
PUT /api/projects/<id>Admin only
DELETE /api/projects/<id>Admin only
GET endpoints, tasks, commentsAny authenticated user

HTTP Basic Auth & Signed Cookies

# HTTP Basic Auth on a specific route
@auth_app.get('/basic-demo')
@auth_basic(_check_basic_auth, realm='TaskFlow')
def basic_auth_demo():
    """Protected by HTTP Basic Auth."""
    return {'message': 'Authenticated via HTTP Basic Auth!'}

# Login sets a signed session cookie (HMAC-SHA256)
@auth_app.post('/login')
@rate_limit(5, per=300)
@validate_request(body={'username': str, 'password': str})
def login():
    """Authenticate and return a signed token + session cookie."""
    data = request.json
    db = Database()
    try:
        user = db.fetchone("SELECT * FROM users WHERE username = ?", (data['username'],))
        if not user or not verify_password(data['password'], user['password_hash']):
            abort(401, 'Invalid credentials')

        token = generate_token(user['id'], user['username'], user['role'])

        # Signed cookie for session-based auth
        response.set_cookie('session_user', user['username'],
                            secret=_get_secret(), path='/',
                            httponly=True, samesite='Lax', max_age=3600)

        return {
            'token': token,
            'user': {'id': user['id'], 'username': user['username'], 'role': user['role']}
        }
    finally:
        db.close()

# Read signed cookie
@auth_app.get('/me')
def me():
    """Get current user from signed session cookie."""
    username = request.get_cookie('session_user', secret=_get_secret())
    if not username:
        abort(401, 'Not authenticated')
    # ...

Middleware Stack

TaskFlow uses all 7 built-in Lcore middleware plus 3 custom middleware, for a total of 10 active middleware ordered by priority.

OrderMiddlewareTypePurpose
0BodyLimitMiddlewareBuilt-inReject oversized request bodies
1RequestIDMiddlewareBuilt-inGenerate/propagate X-Request-ID
2RequestLoggerMiddlewareBuilt-inStructured JSON request logging
2TimingMiddlewareCustomX-Response-Time header
3CORSMiddlewareBuilt-inFull CORS with preflight
5SecurityHeadersMiddlewareBuilt-inHSTS, XSS, clickjacking headers
6TokenAuthMiddlewareCustomBearer token validation
7AdminGuardHookCustomRBAC for /admin/* routes
8AuditLogHookCustomLog state-changing requests
90CompressionMiddlewareBuilt-inGzip compression

Registration Code (app.py)

from lcore import (
    Lcore,
    CORSMiddleware, SecurityHeadersMiddleware, CSRFMiddleware,
    RequestIDMiddleware, RequestLoggerMiddleware,
    BodyLimitMiddleware, CompressionMiddleware,
)
from modules.plugins import TimingMiddleware, AuditLogHook
from modules.auth import TokenAuthMiddleware, AdminGuardHook

app = Lcore()

# Built-in middleware
app.use(BodyLimitMiddleware(
    max_size=int(app.config.get('max_upload_size', 10_485_760))
))
app.use(RequestIDMiddleware())
app.use(RequestLoggerMiddleware(logger=logging.getLogger('http')))

# Custom timing middleware
app.use(TimingMiddleware())

# CORS with full configuration
app.use(CORSMiddleware(
    allow_origins=cors_origins,
    allow_methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
    allow_headers=['Content-Type', 'Authorization', 'X-CSRF-Token', 'X-Requested-With'],
    expose_headers=['X-Request-ID', 'X-Response-Time', 'X-API-Version'],
    allow_credentials=True,
    max_age=86400,
))

app.use(SecurityHeadersMiddleware(hsts=True, hsts_max_age=31536000))

# Custom auth middleware with path skipping
app.use(TokenAuthMiddleware(
    skip_paths=['/auth/', '/health', '/docs', '/', '/static/', '/debug/', '/old-api']
))
app.use(AdminGuardHook())
app.use(AuditLogHook())

# Compression (high order = runs last)
app.use(CompressionMiddleware(min_size=256, level=6))
CSRFMiddleware

CSRFMiddleware is imported but not activated. It is designed for server-rendered form-based apps, not JSON APIs with Bearer token auth where cross-origin requests cannot set custom Authorization headers.

Custom Plugins

TaskFlow includes two custom plugins that demonstrate the setup(app) / apply(callback, route) plugin lifecycle.

APIVersionPlugin (plugins.py)

class APIVersionPlugin:
    """Adds API version info to every JSON response."""
    name = 'api_version'
    api = 2

    def __init__(self, version='0.0.1'):
        self.version = version

    def setup(self, app):
        """Called once when the plugin is installed."""
        self.app = app
        print(f"  [Plugin] APIVersionPlugin v{self.version} installed")

    def apply(self, callback, route):
        """Called for each route. Wraps the callback."""
        # Skip if route opts out
        if route.config.get('skip_versioning'):
            return callback

        version = self.version

        def wrapper(*args, **kwargs):
            result = callback(*args, **kwargs)
            if isinstance(result, dict):
                result['_api_version'] = version
            response.set_header('X-API-Version', version)
            return result

        return wrapper

RequestCounterPlugin (plugins.py)

class RequestCounterPlugin:
    """Tracks total request count per route."""
    name = 'request_counter'
    api = 2

    def __init__(self):
        self.counts = {}

    def setup(self, app):
        self.app = app

    def apply(self, callback, route):
        rule = route.rule
        self.counts.setdefault(rule, 0)
        counts = self.counts

        def wrapper(*args, **kwargs):
            counts[rule] = counts.get(rule, 0) + 1
            result = callback(*args, **kwargs)
            if isinstance(result, dict):
                result['_request_number'] = counts[rule]
            return result

        return wrapper

    def get_stats(self):
        return dict(self.counts)

    def close(self):
        """Called when plugin is uninstalled."""
        print(f"  [Plugin] RequestCounterPlugin closed. Final stats: {self.counts}")

Installing Plugins (app.py)

version_plugin = APIVersionPlugin(version='0.0.1')
app.install(version_plugin)

counter_plugin = RequestCounterPlugin()
app.install(counter_plugin)
Per-Route Config

Routes can opt out of specific plugins. The home page uses skip_versioning=True to prevent the version plugin from modifying its template response: @app.get('/', name='home', skip_versioning=True)

Module Mounting

TaskFlow is organized into four sub-applications, each defined as an independent Lcore() instance and mounted at a URL prefix. This demonstrates Lcore's modular architecture.

from modules.auth import auth_app
from modules.users import users_app
from modules.products import projects_app
from modules.notifications import notifications_app

app = Lcore()

# Mount sub-applications at URL prefixes
app.mount('/auth/', auth_app)
app.mount('/api/users/', users_app)
app.mount('/api/projects/', projects_app)
app.mount('/api/notifications/', notifications_app)
PrefixSub-AppSource FilePurpose
/auth/auth_appmodules/auth.pyLogin, register, session, Basic Auth
/api/users/users_appmodules/users.pyUser CRUD with all HTTP methods
/api/projects/projects_appmodules/products.pyProjects, tasks, comments, uploads
/api/notifications/notifications_appmodules/notifications.pySMTP email, batch notify

Each sub-app is a fully independent Lcore() instance with its own routes, hooks, and middleware:

# modules/users.py
from lcore import Lcore, request

users_app = Lcore()

@users_app.hook('before_request')
def log_user_request():
    print(f"  [Users] {request.method} {request.path}")

@users_app.get('/', name='list_users')
def list_users():
    """List all team members."""
    # ...

Dependency Injection

TaskFlow registers three dependencies with different lifetimes, demonstrating all three DI modes.

import uuid
import time
from models import Database

class AppCache:
    """In-memory cache (singleton lifetime)."""
    def __init__(self):
        self._store = {}

    def get(self, key):
        entry = self._store.get(key)
        if entry and entry['expires'] > time.time():
            return entry['value']
        return None

    def set(self, key, value, ttl=300):
        self._store[key] = {'value': value, 'expires': time.time() + ttl}

    def invalidate(self, key):
        self._store.pop(key, None)

# Singleton: one instance for the entire app lifetime
app.inject('cache', AppCache, lifetime='singleton')

# Scoped: one instance per request, auto-closed via .close()
app.inject('db', lambda: Database(db_path), lifetime='scoped')

# Transient: new value every time it is accessed
app.inject('trace_id', lambda: str(uuid.uuid4()), lifetime='transient')

Access injected dependencies anywhere via ctx:

@app.get('/debug/di', name='debug_di')
def debug_di():
    """Dependency injection demo - shows different lifetimes."""
    return {
        'trace_id_1': ctx.trace_id,
        'trace_id_2': ctx.trace_id,  # Different each access (transient)
        'cache_type': type(ctx.cache).__name__,
        'db_type': type(ctx.db).__name__,
    }
NameLifetimeTypeBehavior
ctx.cachesingletonAppCacheSame instance for all requests
ctx.dbscopedDatabaseOne per request, .close() called automatically
ctx.trace_idtransientstrNew UUID on every access

CRUD Operations

The Users module demonstrates all five HTTP method decorators with typed route parameters, rate limiting, request validation, pagination, and search. Write operations (POST, PUT, PATCH, DELETE) are protected by admin-only guards, except PUT which also allows users to update their own profile.

GET — List with Pagination

@users_app.get('/', name='list_users')
@rate_limit(100, per=60)
def list_users():
    """List all team members with optional role filter and pagination."""
    db = _get_db()
    try:
        role = request.query.get('role')
        page = int(request.query.get('page', '1'))
        limit = int(request.query.get('limit', '20'))
        offset = (page - 1) * limit

        if role:
            users = db.fetchall(
                "SELECT id, username, email, role, is_active, created_at "
                "FROM users WHERE role = ? LIMIT ? OFFSET ?",
                (role, limit, offset)
            )
            total = db.fetchone("SELECT COUNT(*) as cnt FROM users WHERE role = ?", (role,))['cnt']
        else:
            users = db.fetchall(
                "SELECT id, username, email, role, is_active, created_at "
                "FROM users LIMIT ? OFFSET ?",
                (limit, offset)
            )
            total = db.count('users')

        return {'users': users, 'total': total, 'page': page, 'limit': limit}
    finally:
        db.close()

POST — Create

@users_app.post('/', name='create_user')
@rate_limit(20, per=60)
@validate_request(body={'username': str, 'email': str, 'password': str})
def create_user():
    """Invite a new team member."""
    data = request.json
    db = _get_db()
    try:
        if db.fetchone("SELECT id FROM users WHERE username = ?", (data['username'],)):
            abort(409, 'Username already exists')

        user_id = db.insert('users', {
            'username': data['username'],
            'email': data['email'],
            'password_hash': hash_password(data['password']),
            'role': data.get('role', 'member'),
            'is_active': 1,
        })
        response.status = 201
        return {'id': user_id, 'username': data['username'], 'created': True}
    finally:
        db.close()

GET — Read by ID

@users_app.get('/<id:int>', name='get_user')
@rate_limit(200, per=60)
def get_user(id):
    """Fetch a team member by ID."""
    db = _get_db()
    try:
        user = db.fetchone(
            "SELECT id, username, email, role, is_active, avatar, created_at "
            "FROM users WHERE id = ?",
            (id,)
        )
        if not user:
            abort(404, 'User not found')
        return user
    finally:
        db.close()

PUT — Full Update (Admin or Self)

@users_app.put('/<id:int>', name='update_user')
@rate_limit(50, per=60)
@validate_request(body={'username': str, 'email': str})
def update_user(id):
    """Full update of a user profile. Admin or self only."""
    user = getattr(ctx, 'user', None)
    if not user or (user.get('role') != 'admin' and user.get('id') != id):
        abort(403, 'Admin access required or can only update own profile')
    data = request.json
    db = _get_db()
    try:
        if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
            abort(404, 'User not found')
        db.update('users', id, {'username': data['username'], 'email': data['email']})
        return {'id': id, 'updated': True}
    finally:
        db.close()

PATCH — Partial Update

@users_app.patch('/<id:int>', name='patch_user')
@rate_limit(50, per=60)
def patch_user(id):
    """Partial update (change role, deactivate, etc.)."""
    data = request.json
    if not data:
        abort(400, 'No data provided')
    db = _get_db()
    try:
        if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
            abort(404, 'User not found')
        allowed = {'username', 'email', 'role', 'is_active'}
        update_data = {k: v for k, v in data.items() if k in allowed}
        if not update_data:
            abort(400, 'No valid fields to update')
        db.update('users', id, update_data)
        return {'id': id, 'patched': True, 'fields': list(update_data.keys())}
    finally:
        db.close()

DELETE — Remove

@users_app.delete('/<id:int>', name='delete_user')
@rate_limit(10, per=60)
def delete_user(id):
    """Remove a team member."""
    db = _get_db()
    try:
        if not db.fetchone("SELECT id FROM users WHERE id = ?", (id,)):
            abort(404, 'User not found')
        db.delete('users', id)
        return {'id': id, 'deleted': True}
    finally:
        db.close()

File Uploads

Task attachments can be uploaded via multipart form data. Uploaded files are saved to disk and served via static_file().

Upload Handler (products.py)

@projects_app.post('/<project_id:int>/tasks/<task_id:int>/attachment', name='upload_attachment')
@rate_limit(10, per=60)
def upload_attachment(project_id, task_id):
    """Upload a file attachment to a task."""
    db = _get_db()
    try:
        if not db.fetchone("SELECT id FROM tasks WHERE id = ? AND project_id = ?",
                           (task_id, project_id)):
            abort(404, 'Task not found')

        upload = request.files.get('file')
        if not upload:
            abort(400, 'No file provided (field name: "file")')

        upload_dir = os.path.join(os.path.dirname(__file__), '..', 'uploads')
        os.makedirs(upload_dir, exist_ok=True)

        safe_name = f"task_{task_id}_{upload.filename}"
        upload.save(os.path.join(upload_dir, safe_name), overwrite=True)

        db.update('tasks', task_id, {'attachment': safe_name})
        return {
            'task_id': task_id,
            'filename': safe_name,
            'content_type': upload.content_type,
            'uploaded': True,
        }
    finally:
        db.close()

Serving Attachments

@projects_app.get('/attachments/<filename:path>', name='serve_attachment')
def serve_attachment(filename):
    """Serve uploaded task attachments. Demonstrates static_file()."""
    upload_dir = os.path.join(os.path.dirname(__file__), '..', 'uploads')
    return static_file(filename, root=upload_dir,
                       headers={'Cache-Control': 'max-age=3600'})
# Upload a file
curl -X POST -F "[email protected]" \
  http://localhost:8080/api/projects/1/tasks/1/attachment \
  -H "Authorization: Bearer <token>"

# Download it
curl http://localhost:8080/api/projects/attachments/task_1_report.pdf \
  -H "Authorization: Bearer <token>"

Async Routes

Lcore supports async def route handlers. TaskFlow uses asyncio.gather in the dashboard endpoint to run multiple database queries concurrently.

import asyncio

@projects_app.get('/dashboard/overview')
async def dashboard_overview():
    """Project dashboard with aggregated stats."""
    db = _get_db()
    try:
        async def get_project_stats():
            await asyncio.sleep(0.01)
            return db.fetchall(
                """SELECT p.id, p.name, p.status,
                     COUNT(t.id) as task_count,
                     SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) as done_count
                   FROM projects p LEFT JOIN tasks t ON p.id = t.project_id
                   GROUP BY p.id"""
            )

        async def get_overdue_tasks():
            await asyncio.sleep(0.01)
            return db.fetchall(
                """SELECT t.id, t.title, t.due_date, p.name as project_name
                   FROM tasks t JOIN projects p ON t.project_id = p.id
                   WHERE t.due_date < date('now') AND t.status != 'done'"""
            )

        projects, overdue = await asyncio.gather(
            get_project_stats(),
            get_overdue_tasks()
        )

        total_tasks = db.count('tasks')
        total_users = db.count('users')

        return {
            'projects': projects,
            'overdue_tasks': overdue,
            'totals': {
                'projects': len(projects),
                'tasks': total_tasks,
                'users': total_users,
                'overdue': len(overdue),
            }
        }
    finally:
        db.close()

SMTP Email

The notifications module includes a full EmailService that sends emails via Python's smtplib. When SMTP credentials are not configured, it runs in simulated mode, logging emails to the database without sending them.

EmailService (notifications.py)

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart


class EmailService:
    """SMTP email sender. Simulates when credentials aren't configured."""

    def __init__(self):
        self.host = os.environ.get('APP_SMTP_HOST', 'smtp.gmail.com')
        self.port = int(os.environ.get('APP_SMTP_PORT', '587'))
        self.user = os.environ.get('APP_SMTP_USER', '')
        self.password = os.environ.get('APP_SMTP_PASSWORD', '')
        self.from_addr = os.environ.get('APP_SMTP_FROM', '[email protected]')
        self.use_tls = os.environ.get('APP_SMTP_USE_TLS', 'true').lower() == 'true'

    def send(self, to: str, subject: str, body: str, html: bool = False) -> dict:
        msg = MIMEMultipart('alternative')
        msg['From'] = self.from_addr
        msg['To'] = to
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'html' if html else 'plain'))

        db = Database()
        try:
            if not self.user or self.user == '[email protected]':
                # Simulate - no real SMTP configured
                db.insert('notifications', {
                    'type': 'email', 'recipient': to, 'subject': subject,
                    'body': body, 'status': 'simulated',
                })
                return {
                    'status': 'simulated', 'to': to, 'subject': subject,
                    'note': 'SMTP not configured - email simulated.',
                }

            # Real SMTP
            server = smtplib.SMTP(self.host, self.port, timeout=10)
            if self.use_tls:
                server.starttls()
            server.login(self.user, self.password)
            server.sendmail(self.from_addr, [to], msg.as_string())
            server.quit()

            db.insert('notifications', {
                'type': 'email', 'recipient': to, 'subject': subject,
                'body': body, 'status': 'sent',
            })
            return {'status': 'sent', 'to': to, 'subject': subject}

        except Exception as e:
            db.insert('notifications', {
                'type': 'email', 'recipient': to, 'subject': subject,
                'status': 'failed', 'error': str(e),
            })
            return {'status': 'failed', 'to': to, 'error': str(e)}
        finally:
            db.close()

Usage in Routes

@notifications_app.post('/email', name='send_email')
@rate_limit(10, per=60)
@validate_request(body={'to': str, 'subject': str, 'body': str})
def send_email():
    """Send an email notification via SMTP."""
    data = request.json
    svc = EmailService()
    return svc.send(data['to'], data['subject'], data['body'], data.get('html', False))


@notifications_app.post('/task-assigned/<task_id:int>', name='notify_assignment')
@rate_limit(20, per=60)
def notify_assignment(task_id):
    """Notify a user they've been assigned a task."""
    db = Database()
    try:
        task = db.fetchone(
            """SELECT t.*, u.email as assignee_email, u.username as assignee_name,
                 p.name as project_name
               FROM tasks t
               JOIN users u ON t.assignee_id = u.id
               JOIN projects p ON t.project_id = p.id
               WHERE t.id = ?""",
            (task_id,)
        )
        if not task:
            abort(404, 'Task not found or has no assignee')

        svc = EmailService()
        return svc.send(
            to=task['assignee_email'],
            subject=f"[TaskFlow] You've been assigned: {task['title']}",
            body=f"<h2>New Task Assignment</h2>...",
            html=True,
        )
    finally:
        db.close()

Templates

Lcore's .tpl files use SimpleTemplate, the built-in template engine. SimpleTemplate files are standard HTML with embedded Python expressions and control flow. There is no special compilation step — they are rendered at request time via template('name', key=value).

Syntax Reference

SyntaxPurposeExample
{{variable}}Output (HTML-escaped){{title}}
{{!variable}}Output (raw/unescaped){{!raw_html}}
% for x in y:Loop% for item in items:
% if cond:Conditional% if user:
% endEnd block% end
% include('x')Include another template% include('footer')

Welcome Page Template (welcome.tpl)

<!DOCTYPE html>
<html>
<head>
  <title>{{title}}</title>
  <style>
    /* ... CSS styles ... */
  </style>
</head>
<body>
  <div class="container">
    <h1>{{title}}</h1>
    <p class="subtitle">{{description}}</p>

    <div class="features">
      % for feature in features:
      <div class="feature">
        <h3>{{feature['name']}}</h3>
        <p>{{feature['desc']}}</p>
      </div>
      % end
    </div>

    <div class="endpoints">
      % for group in endpoint_groups:
      <div class="endpoint-group">
        <div class="group-title">{{group['name']}}</div>
        % for ep in group['endpoints']:
        <div class="endpoint">
          <span class="method {{ep['method'].lower()}}">{{ep['method']}}</span>
          <span class="path">{{ep['path']}}</span>
          <span class="desc">{{ep['desc']}}</span>
        </div>
        % end
      </div>
      % end
    </div>

    <div class="footer">
      Lcore Framework v{{version}} &middot; Running on {{host}}:{{port}}
    </div>
  </div>
</body>
</html>

Rendering from a Route (app.py)

from lcore import template, TEMPLATE_PATH

# Register template directory
TEMPLATE_PATH.insert(0, os.path.join(os.path.dirname(__file__), 'templates'))

@app.get('/', name='home', skip_versioning=True)
def home():
    """TaskFlow welcome page with API overview."""
    return template('welcome',
        title='TaskFlow API',
        description='Project management API built with the Lcore framework',
        version='0.0.1',
        features=[
            {'name': 'SQLite DB', 'desc': 'Real database with projects, tasks, users, comments'},
            {'name': '.env Config', 'desc': 'Multi-source config loading + dataclass validation'},
            # ...
        ],
        endpoint_groups=[
            {'name': 'Auth', 'endpoints': [
                {'method': 'POST', 'path': '/auth/login', 'desc': 'Login with credentials'},
                # ...
            ]},
        ],
    )

Lifecycle Hooks

TaskFlow uses five lifecycle hooks to inject behavior at different points in the request lifecycle.

HookWhere UsedPurpose
on_request_startapp.pyRecord request start time in ctx.state
before_requestapp.py, users.pyLog incoming request method and path
after_requestapp.pyAdd X-Powered-By: Lcore header
on_response_sendapp.pyLog response status code and duration
on_module_mountapp.pyLog when sub-applications are mounted
@app.hook('on_request_start')
def on_request_start():
    ctx.state['start_time'] = time.time()

@app.hook('before_request')
def before_request():
    logger.info(f"-> {request.method} {request.path}")

@app.hook('after_request')
def after_request():
    response.set_header('X-Powered-By', 'Lcore')

@app.hook('on_response_send')
def on_response_send():
    start = ctx.state.get('start_time', 0)
    duration = (time.time() - start) * 1000
    logger.info(f"<- {request.method} {request.path} [{response.status_code}] {duration:.1f}ms")

@app.hook('on_module_mount')
def on_module_mount(prefix, child):
    logger.info(f"  Mounted module at {prefix}")

The before_request hook is also used at the module level in users.py:

# modules/users.py
@users_app.hook('before_request')
def log_user_request():
    print(f"  [Users] {request.method} {request.path}")

Error Handling

TaskFlow registers custom error handlers for HTTP 404, 500, 429, and 403. The 404 and 500 handlers perform content negotiation: they return JSON if the client sends Accept: application/json, or render an HTML template otherwise.

@app.error(404)
def error_404(error):
    if 'application/json' in (request.get_header('Accept') or ''):
        return {'error': 'Not Found', 'path': request.path, 'status': 404}
    return template('error', status_code=404, status_text='Not Found',
                     message=f"The path '{request.path}' does not exist.")

@app.error(500)
def error_500(error):
    if 'application/json' in (request.get_header('Accept') or ''):
        return {'error': 'Internal Server Error', 'status': 500}
    return template('error', status_code=500, status_text='Internal Server Error',
                     message='Something went wrong. Please try again later.')

@app.error(429)
def error_429(error):
    return {'error': 'Too Many Requests', 'status': 429,
            'message': 'Rate limit exceeded. Please slow down.'}

@app.error(403)
def error_403(error):
    return {'error': 'Forbidden', 'status': 403,
            'message': 'You do not have permission to access this resource.'}

Error Template (error.tpl)

<!DOCTYPE html>
<html>
<head>
  <title>Error {{status_code}}</title>
</head>
<body>
  <div class="card">
    <h1>{{status_code}}</h1>
    <h2>{{status_text}}</h2>
    <p>{{message}}</p>
    <p><a href="/">Back to Home</a></p>
  </div>
</body>
</html>

Auto API Docs

When running in debug mode, TaskFlow enables Lcore's built-in API documentation at /docs. This auto-generates an interactive page listing all registered routes with their methods, parameters, and docstrings.

is_debug = str(app.config.get('debug', 'false')).lower() in ('true', '1', 'yes')
if is_debug:
    app.enable_docs()
    logger.info("  API docs at /docs")

This creates two endpoints:

PathDescription
/docsInteractive HTML documentation page
/docs/jsonRaw JSON API schema
Security

API docs are only enabled when debug=true in the configuration. In production, they are disabled by default to avoid exposing your route structure publicly.

Frontend Application

TaskFlow includes a complete single-page frontend at /frontend, built entirely with Lcore's SimpleTemplate engine. No build tools, no npm, no JavaScript frameworks — just HTML, CSS, and vanilla JavaScript served directly by the backend as a .tpl template.

Pages

PageDescription
LoginDemo credential quick-fill, token stored in localStorage
DashboardStats overview (projects, tasks, team, overdue), project summary table, recent activity feed
ProjectsList all projects, create/delete (admin only)
Project DetailKanban task board (To Do / In Progress / Done), create new tasks with assignee, priority, due date
Task DetailFull task view, status change buttons, comments thread, delete (admin only)
TeamMember list with role badges, add/remove members (admin only)
NotificationsEmail history log, compose and send emails
My ProfileView and edit own username and email, account info and permissions summary

Role-Based UI

The frontend checks state.user.role to conditionally show admin-only UI elements:

function isAdmin() {
  return state.user && state.user.role === 'admin';
}

// Only admins see the create/delete buttons
document.getElementById('topbar-actions').innerHTML =
  isAdmin() ? '<button onclick="openNewProjectModal()">+ New Project</button>' : '';

// Members see projects and tasks, but no destructive actions
if (isAdmin()) html += '<button onclick="deleteProject(id)">Delete</button>';

Backend enforcement matches the UI: even if a member bypasses the frontend and calls the API directly, _require_admin() returns 403 Forbidden. The profile update (PUT /api/users/<id>) allows admin or the user updating their own account.

How It Works

The frontend is a single frontend.tpl file served by a Lcore route:

@app.get('/frontend', name='frontend', skip_versioning=True)
def frontend():
    """TaskFlow frontend — full project management UI."""
    return template('frontend')

It uses fetch() to call the same API endpoints, with the Bearer token stored in localStorage for persistent login across page refreshes.

Test Results

TaskFlow includes a comprehensive test suite that validates all 30 endpoints. Every test passes.

  PASS: Unauthenticated = 401
  PASS: Login
  PASS: Register
  PASS: Basic Auth
  PASS: Health
  PASS: List users
  PASS: Get user
  PASS: Create user
  PASS: Patch user
  PASS: Search users
  PASS: List projects
  PASS: Get project
  PASS: Create project
  PASS: List tasks
  PASS: Create task
  PASS: Update task
  PASS: Get task
  PASS: Add comment
  PASS: Send email
  PASS: Batch notify
  PASS: History
  PASS: Dashboard (async)
  PASS: Activity
  PASS: Admin (admin)
  PASS: Admin (member=403)
  PASS: Routes
  PASS: Middleware
  PASS: Config
  PASS: Stats
  PASS: Redirect 301

  30 passed, 0 failed

Test Coverage by Feature

CategoryTestsWhat is Verified
Authentication4401 without token, login, register, Basic Auth
System1Health check endpoint
Users CRUD5List, get, create, patch, search
Projects & Tasks8List/get/create projects, list/create/update/get tasks, add comment
Notifications3Send email, batch notify (async), history
Dashboard2Dashboard overview (async), activity feed
Admin & RBAC2Admin access with admin token, 403 with member token. Endpoint-level guards on user/project write operations
Debug4Routes, middleware, config (redacted), stats
Redirect1301 redirect from /old-api to /api/projects/

WebSocket Note

Lcore does not natively support WebSockets. Lcore is a WSGI (Web Server Gateway Interface) framework, and WSGI is fundamentally a synchronous request/response protocol. Each WSGI request follows a strict cycle: the server receives a request, passes it to the application, the application returns a response, and the connection closes.

WebSockets require persistent, bidirectional connections where both the client and server can send messages at any time. This model is incompatible with WSGI's one-request-one-response architecture. WSGI has no mechanism for keeping a connection open or pushing data to the client outside of a response.

Coming Soon

A companion WebSocket library designed to work alongside the Lcore framework is currently in development and will be released in a later phase. This library will allow you to run a WebSocket server side-by-side with your Lcore application, sharing configuration, authentication, and application logic seamlessly.

In the meantime, if your application needs WebSocket support, you have two options:

ApproachDescription
Separate WebSocket serverRun a dedicated WebSocket server (e.g., websockets, socket.io) alongside Lcore on a different port
ASGI frameworkUse an ASGI-based framework (e.g., Starlette, FastAPI) that natively supports both HTTP and WebSocket protocols
Why Not ASGI?

Lcore is intentionally WSGI-based for simplicity, broad server compatibility (Gunicorn, Waitress, uWSGI, etc.), and a straightforward programming model. WSGI remains the most widely deployed Python web standard. For the vast majority of REST APIs, WSGI is more than sufficient.