Data Engineer
The 10‑Year Health Plan calls for a shift to digital, preventative, community‑oriented services. Data engineers make it real by delivering timely, trusted, well‑governed data for analysts, clinicians, and services. This path focuses on data quality, lineage, and automation you can ship now.
👤 Role snapshot
You ensure NHS teams get clean, reliable, timely data — from ingestion to analytics. Typical tasks: ingesting source files/feeds, modelling into curated layers, enforcing quality checks, scheduling refreshes, documenting lineage, and enabling secure access.
🎯 Outcomes to target (aligned to the Plan)
- Freshness/SLA: % successful runs, median runtime, time‑to‑availability
- Quality: nulls, duplicates, range checks, join key coverage, small‑number suppression readiness
- Lineage: source → staging → curated → product (documented & versioned)
- Cost: efficient storage (Parquet/partitioning), right‑sized compute, avoid over‑refreshing
⚙️ 90‑minute quickstart
Goal: build a tiny, reproducible pipeline: ingest → transform → validate → publish on SQL Server + files.
1) Ingest (Python → SQL Server) (≈20 min)
import os, urllib.parse, pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv; load_dotenv()
params = urllib.parse.quote_plus(
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={os.getenv('SQLSERVER_SERVER')};"
f"DATABASE={os.getenv('SQLSERVER_DATABASE')};"
"Trusted_Connection=Yes;Encrypt=Yes;"
)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")
# Load de-identified CSV from a secure share
df = pd.read_csv('data/appointments_30d.csv')
# Minimal staging table (idempotent upsert by composite key example)
with engine.begin() as con:
con.exec_driver_sql("""
IF OBJECT_ID('stg_appointments') IS NULL
CREATE TABLE stg_appointments(
booking_id BIGINT NOT NULL,
practice_id VARCHAR(20) NOT NULL,
booked_at DATETIME2 NOT NULL,
start_time DATETIME2 NOT NULL,
attended BIT NULL
);
""")
df.to_sql('stg_appointments', con=con, if_exists='append', index=False)
print("staging load complete")
SQLSERVER_SERVER=YOURSERVER
SQLSERVER_DATABASE=NHS_Analytics
Tip: For speed/traceability, keep a raw landing copy (CSV/Parquet) and a typed staging table. Index staging on common filters (e.g.
start_time).
2) Transform (dbt model → curated view) (≈20 min)
- Model (SQL)
- dbt config (snippets)
SELECT
CAST(booking_id AS BIGINT) AS booking_id,
practice_id,
booked_at,
start_time,
CAST(attended AS INT) AS attended
FROM {{ source('landing','stg_appointments') }}
SELECT
practice_id,
COUNT(*) AS total_appointments,
AVG(CASE WHEN attended = 1 THEN 1.0 ELSE 0.0 END) AS attendance_rate,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY DATEDIFF(MINUTE, booked_at, start_time))
OVER (PARTITION BY practice_id) AS median_wait_minutes
FROM {{ ref('stg_appointments') }}
WHERE start_time >= DATEADD(DAY, -30, SYSUTCDATETIME())
GROUP BY practice_id
name: nhs_pipeline
version: 1.0.0
profile: nhs_sqlserver
models:
nhs_pipeline:
+materialized: view
stg:
+schema: staging
marts:
+schema: marts
nhs_sqlserver:
target: dev
outputs:
dev:
type: sqlserver
driver: "ODBC Driver 18 for SQL Server"
server: YOURSERVER
database: NHS_Analytics
schema: dbo
windows_login: True
encrypt: True
Note: Install the SQL Server adapter for dbt (e.g.
pip install dbt-sqlserver). Adjust schemas to match Trust conventions.
3) Validate (Great Expectations) (≈20 min)
import pandas as pd
import great_expectations as ge
df = pd.read_parquet("out/kpi.parquet") # produced by a transform or export step
gdf = ge.from_pandas(df)
gdf.expect_column_values_to_not_be_null("practice_id")
gdf.expect_column_values_to_be_between("attendance_rate", min_value=0, max_value=1)
gdf.expect_table_row_count_to_be_between(min_value=1)
results = gdf.validate()
print(results.success)
if not results.success:
raise SystemExit("Data quality checks failed")
Minimal viable checks: non‑null keys, valid ranges, duplicate keys, recent freshness.
4) Orchestrate (Airflow TaskFlow) (≈30 min)
from datetime import datetime
from airflow.decorators import dag, task
import subprocess
@dag(schedule_interval="0 6 * * *", start_date=datetime(2024,1,1), catchup=False, tags=["nhs","etl"]) # 06:00 daily
def nhs_etl():
@task()
def ingest():
subprocess.check_call(["python","ingest_appointments.py"]) # to staging
@task()
def transform():
# run dbt build (models + tests) in repo directory
subprocess.check_call(["dbt","build","--project-dir","."])
@task()
def export_kpi():
# Example: export curated KPI to Parquet for downstream apps
subprocess.check_call(["python","export_kpi.py"]) # write out/kpi.parquet
@task()
def validate():
subprocess.check_call(["python","validate_kpi.py"]) # Great Expectations
ingest() >> transform() >> export_kpi() >> validate()
nhs_etl()
Local first: develop each step locally; containerise later for consistent deploys.
▶️ Run (local dev)
# Python env
python -m venv .venv && . .venv/bin/activate # Windows: .venv\Scripts\activate
pip install pandas sqlalchemy pyodbc python-dotenv great_expectations
# Ingest → SQL Server
python ingest_appointments.py
# (Optional) dbt
pip install dbt-sqlserver
dbt deps && dbt build
# Export curated KPI to Parquet (example)
python export_kpi.py
# Validate
python validate_kpi.py
🗓️ Week‑one build (repeatable, safe)
Day 1 — Sources & contracts
- Identify authoritative sources; define keys & update cadence.
- Create landing (raw) and staging layers; document data dictionary.
Day 2 — Transform & tests
- Add dbt models for key entities; include unique/not‑null tests.
- Partition/cluster heavy tables; index common filters.
Day 3 — Quality & lineage
- Add Great Expectations checks; record lineage (README + diagrams).
- Add “data last updated” stamps in downstream products.
Day 4 — Orchestration & monitoring
- Schedule daily runs; set alerts for failures/late data.
- Log run durations and row counts per step.
Day 5 — Governance & sharing
- Secrets via Trust store; pseudonymisation where required.
- Small‑number suppression rules; publish curated access policy.
🧰 Open‑source augmentations (pick 2–3)
Schedule & monitor DAGs; retry & alerting.
SQL‑first transforms, tests, docs & lineage.
Automated data quality checks.
Fast local SQL over CSV/Parquet; dev prototyping.
Streams for near‑real‑time events.
Object storage for Parquet + versioned data lake.
See also: DuckDB · Docker · Git · GitHub · Secrets & .env
🛡️ IG & safety checklist
- Keep secrets out of code; use environment/secret stores.
- Use synthetic/de‑identified data in dev; audit logs for prod.
- Apply small‑number suppression and aggregation where required.
- Record data lineage and owners; store DPIA/approvals alongside the repo.
- Access control by role; least‑privilege service accounts.
📏 Measuring impact
- Reliability: success rate, MTTR, on‑time completion.
- Freshness: time from source arrival to curated availability.
- Quality: % checks passing; duplicate/null key rate.
- Cost/efficiency: storage footprint; compute time; avoided re‑runs.
📚 References
-
10‑Year Health Plan (Executive Summary) — Powering transformation
https://www.gov.uk/government/publications/10-year-health-plan-for-england-fit-for-the-future/fit-for-the-future-10-year-health-plan-for-england-executive-summary#powering-transformation-innovation-to-drive-healthcare-reform -
10‑Year Health Plan (Executive Summary) — From analogue to digital
https://www.gov.uk/government/publications/10-year-health-plan-for-england-fit-for-the-future/fit-for-the-future-10-year-health-plan-for-england-executive-summary#from-analogue-to-digital-power-in-your-hands
What’s next?
You’ve completed the Persona — Data Engineer stage. Keep momentum: