Prefect pipeline with Agentic LLM Log Analysis

Introduction

This project is less about the data pipeline itself and more about what happens when things go wrong. It focus mostly in the error simulation handling, analysed by the LLM model which can provide an reasonable response helping the pipeline understanding.

The currency pipeline intentionally fails, and that’s the point. Every error it throws gets picked up by a local LLM agent that reads the logs, figures out what broke, and sends a diagnosis by email.

Architecture

The system has 3 main layers:

  • Source: Frankfurter API, a free open API for EUR exchange rates.
  • Data Pipeline & Observability: Prefect orchestrates the flow; logs are pushed to Loki/Grafana.
  • LLM Agent: A hook queries Loki for errors, sends them to Ollama (Gemma3 running locally), and send email with the diagnosis.

Requirements

  • Python 3.13+
  • Docker & Docker Compose
  • Ollama installed locally
  • Gmail account (or any SMTP) for email notifications

Step 1 - Clone and install

git clone https://github.com/lucnietoX/logging-agent-llm

Create and activate a virtual environment:

python -m venv .venv
source .venv/bin/activate  # on macOS/Linux

Install dependencies:

pip install -r requirements.txt

Step 2 - Configure environment variables

Copy the template and fill in your credentials:

cp template-env .env

Edit .env:

SMTP_SERVER="smtp.gmail.com"
SMTP_PORT=587
SENDER_EMAIL=your@gmail.com
SENDER_PASSWORD=your_app_password
RECEIVER_EMAIL=receiver@gmail.com
LOKI_URL="http://localhost:3100/loki/api/v1/query_range"

Step 3 - Start the infrastructure

Download and Install the Gemma3 model.

I choose the model Gemma3, provided by Google. (https://ollama.com/library/gemma3)

Command:

ollama run gemma3:4b

List all the services in ollama:

ollama ps

Activate the ollama server if down:

ollama serve

Activate Docker:

docker compose up -d

Wait for all services to be healthy:

docker compose ps

The docker-compose.yml activates:

Verify Prefect UI is running at http://localhost:4200:

Verify Grafana is running at http://localhost:3000 (default credentials: admin / admin):


Step 4 - Understanding the code

Loki Handler

The loki_handler.py sets up a custom Python logger that ships every log line to Loki with structured labels: application and flow which makes filtering in Grafana very clean:

# dataflow/utils/loki_handler.py
def setup_loki_handler(application_name: str, flow_name: str) -> logging.Logger:
    loki_handler = logging_loki.LokiHandler(
        url=LOKI_SERVER,
        tags={
            "application": application_name,
            "flow": flow_name,
        },
        version="1",
    )
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.DEBUG)
    loki_handler.setLevel(logging.DEBUG)

    logger = logging.getLogger(flow_name)
    logger.addHandler(loki_handler)
    logger.addHandler(stream_handler)
    logger.setLevel(logging.DEBUG)
    return logger

Every task in the pipeline imports this logger instead of using Prefect’s default logger, this is what makes logs queryable in Loki.

Prefect Pipeline

"""
Prefect Data Flow 1 - Currency Exchange Pipeline
API: frankfurter.app
Output: data/currency_rates.csv

Simulate 2 scenarios:
  1. Success: fetch real exchange rates EUR → USD, GBP, BRL, JPY, CHF
  2. Forced Error: attempt to fetch invalid currency (XXX) → controlled failure
"""

import requests
import pandas as pd
import random
from datetime import datetime, timezone
from pathlib import Path
from prefect import flow, task
from dataflow.utils.loki_handler import setup_loki_handler

BASE_URL = "https://api.frankfurter.app"
BASE_CCY = "EUR"
TARGET_CCY = ["USD", "GBP", "BRL", "JPY", "CHF"]
OUTPUT_DIR = Path("data")
OUTPUT_FILE = OUTPUT_DIR / "currency_rates.csv"
ERROR_RATE = 0.3

logger = setup_loki_handler("logging-agent-llm", "currency-exchange-pipeline")
logger.info("Pipeline started at %s", datetime.now().isoformat())

# ----- Tasks -----

# This requires all the tasks performed in the pipeline:


@task(name="fetch_exchange_rates", retries=1, retry_delay_seconds=5)
def fetch_rates(base: str, targets: list[str]) -> dict:
    """Fetch exchange rates from the API."""
    logger.info("Fetching rates: %s → %s", base, targets)
    symbols = ",".join(targets)
    url = f"{BASE_URL}/latest?from={base}&to={symbols}"

    logger.info("Requesting URL: %s", url)
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
    except requests.RequestException as e:
        logger.error("Error fetching exchange rates: %s", e)
        raise
    logger.info("Successfully fetched rates! Status code: %d", response.status_code)
    data = response.json()
    logger.info("Rates fetched for date: %s", data.get("date"))
    return data


@task(name="inject_error_simulation")
def maybe_inject_error() -> bool:
    """Simulate error injection with probability ERROR_RATE."""
    if random.random() < ERROR_RATE:
        url = f"{BASE_URL}/latest?from=EUR&to=XXX"
        response = requests.get(url, timeout=10)
        logger.warning(
            "Error received for url %s: %s - Status %d",
            url,
            response.text,
            response.status_code,
        )
        response.raise_for_status()
        return True

    logger.info("No error injected this run")
    return False


@task(name="transform_to_dataframe")
def transform(data: dict) -> pd.DataFrame:
    """Transform raw API data into a DataFrame."""
    rows = []
    for currency, rate in data["rates"].items():
        rows.append(
            {
                "extracted_at": datetime.now(timezone.utc).isoformat(),
                "base_currency": data["base"],
                "target_currency": currency,
                "rate": rate,
                "date": data["date"],
            }
        )

    df = pd.DataFrame(rows)
    logger.info("Transformed %d rows", len(df))
    return df


@task(name="save_to_csv")
def save_csv(df: pd.DataFrame) -> str:
    """Save DataFrame to CSV, appending if file exists."""
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    # Append to existing file if it exists, otherwise create new
    if OUTPUT_FILE.exists():
        existing = pd.read_csv(OUTPUT_FILE)
        df = pd.concat([existing, df], ignore_index=True)

    df.to_csv(OUTPUT_FILE, index=False)
    logger.info("File saved %d total rows to %s", len(df), OUTPUT_FILE)
    return str(OUTPUT_FILE)


# ----- Flow -----


@flow(
    name="currency-exchange-pipeline",
    description="Fetches EUR exchange rates and saves to CSV. Simulates random errors.",
    log_prints=True,
    flow_run_name=lambda: f"currency-run-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
)
def currency_pipeline():
    logger.info("Starting Currency Exchange Pipeline")

    # Try to inject error (30% chance) before fetching real rates
    maybe_inject_error()

    # Extract
    raw = fetch_rates(base=BASE_CCY, targets=TARGET_CCY)

    # Transform
    df = transform(raw)

    # Save
    path = save_csv(df)

    logger.info("Pipeline complete | output: %s", path)
    return path


# ----- Entrypoint -----
if __name__ == "__main__":
    currency_pipeline()


Currency Exchange Flow

flow_currency.py is the main pipeline. It has four tasks:

# flows/flow_currency.py

@task(name="fetch_exchange_rates", retries=1, retry_delay_seconds=5)
def fetch_rates(base: str, targets: list[str]) -> dict:
    """Fetch live EUR exchange rates from Frankfurter API."""
    ...

@task(name="inject_error_simulation")
def maybe_inject_error() -> bool:
    """Injects a controlled failure 30% of the time fetches invalid currency XXX."""
    ...

@task(name="transform_to_dataframe")
def transform(data: dict) -> pd.DataFrame:
    """Transforms raw API response into a structured DataFrame."""
    ...

@task(name="save_to_csv")
def save_csv(df: pd.DataFrame) -> str:
    """Appends new rates to data/currency_rates.csv."""
    ...

The error injection (maybe_inject_error) is intentional, it simulates real-world pipeline failures at a 30% rate by requesting an invalid currency code (XXX), which triggers an HTTP error and generates ERROR-level logs in Loki.


LLM Agent Flow

logger_agent_pipelines.py is the agent layer. It runs as a separate Prefect flow and does three things:

1. Fetch errors from Loki using the HTTP API with a LogQL query:

def fetch_errors_from_loki():
    query_params = {
        "query": '{application="logging-agent-llm"} |= `Error`',
        "start": str(start),   # last 3 days
        "end": str(now_ns),
    }
    response = httpx.get(LOKI_URL, params=query_params)
    ...

2. Send logs to Gemma3 via Ollama with a structured system prompt.

3. Send the diagnosis by email via SMTP.


Step 5 - Run the currency pipeline

With the Prefect server running, start the flow:

python dataflow/flow_currency.py

Or serve it as a Prefect deployment that can be triggered from the UI:

prefect deployment run 'currency-exchange-pipeline/currency-pipeline'

Prefect:

Check that logs are arriving in Grafana. Open http://localhost:3000

{application="logging-agent-llm"}

Grafana

Filter only errors:

{application="logging-agent-llm"} |= `Error`

Detail of the log:


Step 7 - Run the LLM Agent

python flows/logger_agent_pipelines.py

This serves the agent as a long-running Prefect deployment. You can also trigger it manually:

prefect deployment run 'LLM Mac Log Agent/mac-log-analyzer'

See the deployment on Prefect

Run the pipeline

Check the logs

Logs on service (local)


Result

Email output:

The full loop works end to end:

  1. Prefect runs the currency pipeline on schedule.
  2. All logs (INFO, WARNING, ERROR) are shipped to Loki in real time.
  3. Grafana provides full visibility over log streams.
  4. When errors occur, the LLM Agent fetches them, runs a local Gemma3 diagnosis, and sends a structured email: no cloud LLM API needed, no cost, full privacy.

More