Skip to main content

Data Engineer

🔧 Pipelines · SQL Server · Python ETL · Airflow · dbt · Great Expectations
🎯 Why this matters now

The 10‑Year Health Plan calls for a shift to digital, preventative, community‑oriented services. Data engineers make it real by delivering timely, trusted, well‑governed data for analysts, clinicians, and services. This path focuses on data quality, lineage, and automation you can ship now.


👤 Role snapshot

You ensure NHS teams get clean, reliable, timely data — from ingestion to analytics. Typical tasks: ingesting source files/feeds, modelling into curated layers, enforcing quality checks, scheduling refreshes, documenting lineage, and enabling secure access.


🎯 Outcomes to target (aligned to the Plan)

Data freshnessReliability (SLA)Data qualityLineageCost awareness
  • Freshness/SLA: % successful runs, median runtime, time‑to‑availability
  • Quality: nulls, duplicates, range checks, join key coverage, small‑number suppression readiness
  • Lineage: source → staging → curated → product (documented & versioned)
  • Cost: efficient storage (Parquet/partitioning), right‑sized compute, avoid over‑refreshing

⚙️ 90‑minute quickstart

Goal: build a tiny, reproducible pipeline: ingest → transform → validate → publish on SQL Server + files.

1) Ingest (Python → SQL Server)  (≈20 min)

ingest_appointments.py
import os, urllib.parse, pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv; load_dotenv()

params = urllib.parse.quote_plus(
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={os.getenv('SQLSERVER_SERVER')};"
f"DATABASE={os.getenv('SQLSERVER_DATABASE')};"
"Trusted_Connection=Yes;Encrypt=Yes;"
)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

# Load de-identified CSV from a secure share
df = pd.read_csv('data/appointments_30d.csv')

# Minimal staging table (idempotent upsert by composite key example)
with engine.begin() as con:
con.exec_driver_sql("""
IF OBJECT_ID('stg_appointments') IS NULL
CREATE TABLE stg_appointments(
booking_id BIGINT NOT NULL,
practice_id VARCHAR(20) NOT NULL,
booked_at DATETIME2 NOT NULL,
start_time DATETIME2 NOT NULL,
attended BIT NULL
);
""")
df.to_sql('stg_appointments', con=con, if_exists='append', index=False)
print("staging load complete")
.env (local only — do not commit)
SQLSERVER_SERVER=YOURSERVER
SQLSERVER_DATABASE=NHS_Analytics

Tip: For speed/traceability, keep a raw landing copy (CSV/Parquet) and a typed staging table. Index staging on common filters (e.g. start_time).

2) Transform (dbt model → curated view)  (≈20 min)

models/stg/stg_appointments.sql
SELECT
CAST(booking_id AS BIGINT) AS booking_id,
practice_id,
booked_at,
start_time,
CAST(attended AS INT) AS attended
FROM {{ source('landing','stg_appointments') }}
models/marts/vw_practice_kpi.sql
SELECT
practice_id,
COUNT(*) AS total_appointments,
AVG(CASE WHEN attended = 1 THEN 1.0 ELSE 0.0 END) AS attendance_rate,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY DATEDIFF(MINUTE, booked_at, start_time))
OVER (PARTITION BY practice_id) AS median_wait_minutes
FROM {{ ref('stg_appointments') }}
WHERE start_time >= DATEADD(DAY, -30, SYSUTCDATETIME())
GROUP BY practice_id

Note: Install the SQL Server adapter for dbt (e.g. pip install dbt-sqlserver). Adjust schemas to match Trust conventions.

3) Validate (Great Expectations)  (≈20 min)

validate_kpi.py
import pandas as pd
import great_expectations as ge

df = pd.read_parquet("out/kpi.parquet") # produced by a transform or export step

gdf = ge.from_pandas(df)
gdf.expect_column_values_to_not_be_null("practice_id")
gdf.expect_column_values_to_be_between("attendance_rate", min_value=0, max_value=1)
gdf.expect_table_row_count_to_be_between(min_value=1)

results = gdf.validate()
print(results.success)
if not results.success:
raise SystemExit("Data quality checks failed")

Minimal viable checks: non‑null keys, valid ranges, duplicate keys, recent freshness.

4) Orchestrate (Airflow TaskFlow)  (≈30 min)

dags/nhs_etl.py
from datetime import datetime
from airflow.decorators import dag, task
import subprocess

@dag(schedule_interval="0 6 * * *", start_date=datetime(2024,1,1), catchup=False, tags=["nhs","etl"]) # 06:00 daily
def nhs_etl():
@task()
def ingest():
subprocess.check_call(["python","ingest_appointments.py"]) # to staging

@task()
def transform():
# run dbt build (models + tests) in repo directory
subprocess.check_call(["dbt","build","--project-dir","."])

@task()
def export_kpi():
# Example: export curated KPI to Parquet for downstream apps
subprocess.check_call(["python","export_kpi.py"]) # write out/kpi.parquet

@task()
def validate():
subprocess.check_call(["python","validate_kpi.py"]) # Great Expectations

ingest() >> transform() >> export_kpi() >> validate()

nhs_etl()

Local first: develop each step locally; containerise later for consistent deploys.


▶️ Run (local dev)

# Python env
python -m venv .venv && . .venv/bin/activate # Windows: .venv\Scripts\activate
pip install pandas sqlalchemy pyodbc python-dotenv great_expectations

# Ingest → SQL Server
python ingest_appointments.py

# (Optional) dbt
pip install dbt-sqlserver
dbt deps && dbt build

# Export curated KPI to Parquet (example)
python export_kpi.py

# Validate
python validate_kpi.py

🗓️ Week‑one build (repeatable, safe)

Day 1 — Sources & contracts

  • Identify authoritative sources; define keys & update cadence.
  • Create landing (raw) and staging layers; document data dictionary.

Day 2 — Transform & tests

  • Add dbt models for key entities; include unique/not‑null tests.
  • Partition/cluster heavy tables; index common filters.

Day 3 — Quality & lineage

  • Add Great Expectations checks; record lineage (README + diagrams).
  • Add “data last updated” stamps in downstream products.

Day 4 — Orchestration & monitoring

  • Schedule daily runs; set alerts for failures/late data.
  • Log run durations and row counts per step.

Day 5 — Governance & sharing

  • Secrets via Trust store; pseudonymisation where required.
  • Small‑number suppression rules; publish curated access policy.

🧰 Open‑source augmentations (pick 2–3)

Airflow
Schedule & monitor DAGs; retry & alerting.
dbt
SQL‑first transforms, tests, docs & lineage.
Great Expectations
Automated data quality checks.
DuckDB
Fast local SQL over CSV/Parquet; dev prototyping.
Kafka (optional)
Streams for near‑real‑time events.
MinIO/S3
Object storage for Parquet + versioned data lake.

See also: DuckDB · Docker · Git · GitHub · Secrets & .env


🛡️ IG & safety checklist

  • Keep secrets out of code; use environment/secret stores.
  • Use synthetic/de‑identified data in dev; audit logs for prod.
  • Apply small‑number suppression and aggregation where required.
  • Record data lineage and owners; store DPIA/approvals alongside the repo.
  • Access control by role; least‑privilege service accounts.

📏 Measuring impact

  • Reliability: success rate, MTTR, on‑time completion.
  • Freshness: time from source arrival to curated availability.
  • Quality: % checks passing; duplicate/null key rate.
  • Cost/efficiency: storage footprint; compute time; avoided re‑runs.

📚 References

What’s next?

You’ve completed the Persona — Data Engineer stage. Keep momentum: