pypsa-eur/scripts/build_gdp_pop_non_nuts3.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

150 lines
5.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: : 2017-2024 The PyPSA-Eur Authors
#
# SPDX-License-Identifier: MIT
"""
Maps the per-capita GDP and population values to non-NUTS3 regions.
The script takes as input the country code, a GeoDataFrame containing
the regions, and the file paths to the datasets containing the GDP and
POP values for non-NUTS3 countries.
"""
import logging
import geopandas as gpd
import numpy as np
import pandas as pd
import pypsa
import rasterio
import xarray as xr
from _helpers import configure_logging, set_scenario_config
from rasterio.mask import mask
from shapely.geometry import box
logger = logging.getLogger(__name__)
def calc_gdp_pop(country, regions, gdp_non_nuts3, pop_non_nuts3):
"""
Calculate the GDP p.c. and population values for non NUTS3 regions.
Parameters:
country (str): The two-letter country code of the non-NUTS3 region.
regions (GeoDataFrame): A GeoDataFrame containing the regions.
gdp_non_nuts3 (str): The file path to the dataset containing the GDP p.c values
for non NUTS3 countries (e.g. MD, UA)
pop_non_nuts3 (str): The file path to the dataset containing the POP values
for non NUTS3 countries (e.g. MD, UA)
Returns:
tuple: A tuple containing two GeoDataFrames:
- gdp: A GeoDataFrame with the mean GDP p.c. values mapped to each bus.
- pop: A GeoDataFrame with the summed POP values mapped to each bus.
"""
regions = regions.rename(columns={"name": "Bus"}).set_index("Bus")
regions = regions[regions.country == country]
# Create a bounding box for UA, MD from region shape, including a buffer of 10000 metres
bounding_box = (
gpd.GeoDataFrame(geometry=[box(*regions.total_bounds)], crs=regions.crs)
.to_crs(epsg=3857)
.buffer(10000)
.to_crs(regions.crs)
)
# GDP Mapping
logger.info(f"Mapping mean GDP p.c. to non-NUTS3 region: {country}")
with xr.open_dataset(gdp_non_nuts3) as src_gdp:
src_gdp = src_gdp.where(
(src_gdp.longitude >= bounding_box.bounds.minx.min())
& (src_gdp.longitude <= bounding_box.bounds.maxx.max())
& (src_gdp.latitude >= bounding_box.bounds.miny.min())
& (src_gdp.latitude <= bounding_box.bounds.maxy.max()),
drop=True,
)
gdp = src_gdp.to_dataframe().reset_index()
gdp = gdp.rename(columns={"GDP_per_capita_PPP": "gdp"})
gdp = gdp[gdp.time == gdp.time.max()]
gdp_raster = gpd.GeoDataFrame(
gdp,
geometry=gpd.points_from_xy(gdp.longitude, gdp.latitude),
crs="EPSG:4326",
)
gdp_mapped = gpd.sjoin(gdp_raster, regions, predicate="within")
gdp = (
gdp_mapped.copy()
.groupby(["Bus", "country"])
.agg({"gdp": "mean"})
.reset_index(level=["country"])
)
# Population Mapping
logger.info(f"Mapping summed population to non-NUTS3 region: {country}")
with rasterio.open(pop_non_nuts3) as src_pop:
# Mask the raster with the bounding box
out_image, out_transform = mask(src_pop, bounding_box, crop=True)
out_meta = src_pop.meta.copy()
out_meta.update(
{
"driver": "GTiff",
"height": out_image.shape[1],
"width": out_image.shape[2],
"transform": out_transform,
}
)
masked_data = out_image[0] # Use the first band (rest is empty)
row_indices, col_indices = np.where(masked_data != src_pop.nodata)
values = masked_data[row_indices, col_indices]
# Affine transformation from pixel coordinates to geo coordinates
x_coords, y_coords = rasterio.transform.xy(out_transform, row_indices, col_indices)
pop_raster = pd.DataFrame({"x": x_coords, "y": y_coords, "pop": values})
pop_raster = gpd.GeoDataFrame(
pop_raster,
geometry=gpd.points_from_xy(pop_raster.x, pop_raster.y),
crs=src_pop.crs,
)
pop_mapped = gpd.sjoin(pop_raster, regions, predicate="within")
pop = (
pop_mapped.groupby(["Bus", "country"])
.agg({"pop": "sum"})
.reset_index()
.set_index("Bus")
)
gdp_pop = regions.join(gdp.drop(columns="country"), on="Bus").join(
pop.drop(columns="country"), on="Bus"
)
gdp_pop.fillna(0, inplace=True)
return gdp_pop
if __name__ == "__main__":
if "snakemake" not in globals():
from _helpers import mock_snakemake
snakemake = mock_snakemake("build_gdp_pop_non_nuts3")
configure_logging(snakemake)
set_scenario_config(snakemake)
n = pypsa.Network(snakemake.input.base_network)
regions = gpd.read_file(snakemake.input.regions)
gdp_non_nuts3 = snakemake.input.gdp_non_nuts3
pop_non_nuts3 = snakemake.input.pop_non_nuts3
subset = {"MD", "UA"}.intersection(snakemake.params.countries)
gdp_pop = pd.concat(
[
calc_gdp_pop(country, regions, gdp_non_nuts3, pop_non_nuts3)
for country in subset
],
axis=0,
)
logger.info(
f"Exporting GDP and POP values for non-NUTS3 regions {snakemake.output}"
)
gdp_pop.reset_index().to_file(snakemake.output, driver="GeoJSON")