Source code for blocksnet.method.provision.provision

import math
from itertools import product
from typing import Literal

import contextily as cx
import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.gridspec import GridSpec
from pulp import PULP_CBC_CMD, LpMinimize, LpProblem, LpVariable, lpSum

from ...models import Block, ServiceType
from ..base_method import BaseMethod


[docs]class Provision(BaseMethod): """Class provides methods for service type provision assessment""" def _add_basemap(self, ax): ... # cx.add_basemap(ax, source=cx.providers.Stamen.TonerLite, crs=f"EPSG:{self.city_model.epsg}")
[docs] def plot(self, gdf: gpd.GeoDataFrame): """Visualizes provision assessment results""" ax = gdf.plot(color="#ddd") gdf.plot(ax=ax, column="provision", cmap="RdYlGn", vmin=0, vmax=1, legend=True) ax.set_title("Provision: " + f"{self.total_provision(gdf): .3f}") ax.set_axis_off() self._add_basemap(ax)
[docs] def plot_delta(self, gdf_before: gpd.GeoDataFrame, gdf_after: gpd.GeoDataFrame): gdf = gdf_after.copy() ax = gdf.plot(color="#ddd") gdf = gdf.loc[gdf["provision"] != 0] gdf["provision"] -= gdf_before["provision"] gdf.loc[gdf["provision"] != 0].plot(column="provision", ax=ax, cmap="PuOr", vmin=-1, vmax=1, legend=True) prov_delta = self.total_provision(gdf_after) - self.total_provision(gdf_before) ax.set_title("Provision delta: " + f"{prov_delta: .3f}") ax.set_axis_off() self._add_basemap(ax)
[docs] def plot_provisions(self, provisions: dict[str, gpd.GeoDataFrame]): def show_me_chart(ax, gdf, name, sum): gdf.plot(column="provision", legend=True, ax=ax, cmap="RdYlGn", vmin=0, vmax=1) gdf[gdf["demand"] == 0].plot(ax=ax, color="#ddd", alpha=1) ax.set_title(str(name.name) + " provision: " + f"{sum: .3f}") ax.set_axis_off() n_plots = len(provisions) _, axs = plt.subplots(nrows=n_plots // 2 + n_plots % 2, ncols=2, figsize=(20, 20)) for i, (service_type, provision_gdf) in enumerate(provisions.items()): show_me_chart(axs[i // 2][i % 2], provision_gdf, service_type, self.total_provision(provision_gdf)) plt.show()
[docs] def _get_filtered_blocks(self, service_type: ServiceType, type: Literal["demand", "capacity"]) -> list[Block]: """Get blocks filtered by demand or capacity greater than 0""" return list(filter(lambda b: b[service_type.name][type] > 0, self.city_model.blocks))
def _get_sorted_neighbors(self, block, capacity_blocks: list[Block]): return sorted(capacity_blocks, key=lambda b: self.city_model.get_distance(block, b))
[docs] def _get_blocks_gdf(self, service_type: ServiceType) -> gpd.GeoDataFrame: """Returns blocks gdf for provision assessment""" data: list[dict] = [] for block in self.city_model.blocks: data.append({"id": block.id, "geometry": block.geometry, **block[service_type.name]}) gdf = gpd.GeoDataFrame(data).set_index("id").set_crs(epsg=self.city_model.epsg) gdf["demand_left"] = gdf["demand"] gdf["demand_within"] = 0 gdf["demand_without"] = 0 gdf["capacity_left"] = gdf["capacity"] return gdf
[docs] @classmethod def stat_provision(cls, gdf: gpd.GeoDataFrame): return { "mean": gdf["provision"].mean(), "median": gdf["provision"].median(), "min": gdf["provision"].min(), "max": gdf["provision"].max(), }
[docs] @classmethod def total_provision(cls, gdf: gpd.GeoDataFrame): return gdf["demand_within"].sum() / gdf["demand"].sum()
[docs] def calculate_scenario( self, scenario: dict[str, float], update_df: pd.DataFrame = None, method: Literal["iterative", "lp"] = "lp", ) -> tuple[dict[str, gpd.GeoDataFrame], float]: result = {} total = 0 for service_type, weight in scenario.items(): prov_gdf = self.calculate(service_type, update_df, method) result[service_type] = prov_gdf total += weight * self.total_provision(prov_gdf) return result, total
[docs] def calculate( self, service_type: ServiceType | str, update_df: pd.DataFrame = None, method: Literal["iterative", "lp"] = "lp" ) -> gpd.GeoDataFrame: """Provision assessment using certain method for the current city and service type, can be used with certain updated blocks GeoDataFrame""" if not isinstance(service_type, ServiceType): service_type = self.city_model[service_type] gdf = self._get_blocks_gdf(service_type) if update_df is not None: gdf = gdf.join(update_df) gdf[update_df.columns] = gdf[update_df.columns].fillna(0) if not "population" in gdf: gdf["population"] = 0 gdf["demand"] += gdf["population"].apply(service_type.calculate_in_need) gdf["demand_left"] = gdf["demand"] if not service_type.name in gdf: gdf[service_type.name] = 0 gdf["capacity"] += gdf[service_type.name] gdf["capacity_left"] += gdf[service_type.name] match method: case "lp": gdf = self._lp_provision(gdf, service_type) case "iterative": gdf = self._iterative_provision(gdf, service_type) gdf["provision"] = gdf["demand_within"] / gdf["demand"] return gdf
[docs] def _lp_provision(self, gdf: gpd.GeoDataFrame, service_type: ServiceType) -> gpd.GeoDataFrame: """Linear programming assessment method""" gdf = gdf.copy() delta = gdf["demand"].sum() - gdf["capacity"].sum() fictive_index = None fictive_column = None fictive_block_id = gdf.index.max() + 1 if delta > 0: fictive_column = fictive_block_id gdf.loc[fictive_column, "capacity"] = delta if delta < 0: fictive_index = fictive_block_id gdf.loc[fictive_index, "demand"] = -delta gdf["capacity_left"] = gdf["capacity"] def _get_weight(id1: int, id2: int): if id1 == fictive_block_id or id2 == fictive_block_id: return 0 block1 = self.city_model[id1] block2 = self.city_model[id2] return int(self.city_model.get_distance(block1, block2)) demand = gdf.loc[gdf["demand"] > 0] capacity = gdf.loc[gdf["capacity"] > 0] prob = LpProblem("Transportation", LpMinimize) x = LpVariable.dicts("Route", product(demand.index, capacity.index), 0, None) prob += lpSum(_get_weight(n, m) * x[n, m] for n in demand.index for m in capacity.index) for n in demand.index: prob += lpSum(x[n, m] for m in capacity.index) == demand.loc[n, "demand"] for m in capacity.index: prob += lpSum(x[n, m] for n in demand.index) == capacity.loc[m, "capacity"] prob.solve(PULP_CBC_CMD(msg=False)) # make the output for var in prob.variables(): value = var.value() name = var.name.replace("(", "").replace(")", "").replace(",", "").split("_") if name[2] == "dummy": continue a = int(name[1]) b = int(name[2]) weight = _get_weight(a, b) if value > 0: if weight <= service_type.accessibility: if fictive_index != None and a != fictive_index: gdf.loc[a, "demand_within"] = value gdf.loc[b, "capacity_left"] = value if fictive_column != None and b != fictive_column: gdf.loc[a, "demand_within"] = value gdf.loc[b, "capacity_left"] = value else: if fictive_index != None and a != fictive_index: gdf.loc[a, "demand_without"] = value gdf.loc[b, "capacity_left"] = value if fictive_column != None and b != fictive_column: gdf.loc[a, "demand_without"] = value gdf.loc[b, "capacity_left"] = value if fictive_index != None or fictive_column != None: gdf.drop(labels=[fictive_block_id], inplace=True) return gdf
[docs] def _iterative_provision(self, gdf: gpd.GeoDataFrame, service_type: ServiceType) -> gpd.GeoDataFrame: """Iterative provision assessment method""" demand_blocks = self._get_filtered_blocks(service_type, "demand") capacity_blocks = self._get_filtered_blocks(service_type, "capacity") while len(demand_blocks) > 0 and len(capacity_blocks) > 0: for demand_block in demand_blocks: neighbors = self._get_sorted_neighbors(demand_block, capacity_blocks) if len(neighbors) == 0: break capacity_block = neighbors[0] gdf.loc[demand_block.id, "demand_left"] -= 1 weight = self.city_model.get_distance(demand_block, capacity_block) if weight <= service_type.accessibility: gdf.loc[demand_block.id, "demand_within"] += 1 else: gdf.loc[demand_block.id, "demand_without"] += 1 if gdf.loc[demand_block.id, "demand_left"] == 0: demand_blocks.remove(demand_block) gdf.loc[capacity_block.id, "capacity_left"] -= 1 if gdf.loc[capacity_block.id, "capacity_left"] == 0: capacity_blocks.remove(capacity_block) return gdf