from __future__ import annotations
import pickle
from functools import singledispatchmethod
import geopandas as gpd
import networkx as nx
import pandas as pd
from matplotlib import pyplot as plt
from pydantic import BaseModel, Field, InstanceOf
from shapely import LineString, Point, Polygon
from ..utils import SERVICE_TYPES
from .geodataframe import BaseRow, GeoDataFrame
from .service_type import ServiceType
[docs]class BuildingRow(BaseRow):
geometry: Point
population: int = Field(ge=0, default=0)
floors: float = Field(ge=0)
area: float = Field(ge=0)
living_area: float = Field(ge=0)
is_living: bool
[docs]class ServiceRow(BaseRow):
geometry: Point
capacity: int = Field(ge=0)
[docs]class Block(BaseModel):
"""Class presenting city block"""
id: int
"""Unique block identifier across the ```city```"""
geometry: InstanceOf[Polygon] = Field()
"""Block geometry presented as shapely ```Polygon```"""
landuse: str = None
"""Current city block landuse"""
buildings: InstanceOf[gpd.GeoDataFrame] = None
"""Buildings ```GeoDataFrame```"""
services: InstanceOf[dict[ServiceType, gpd.GeoDataFrame]] = {}
"""Services ```GeoDataFrames```s for different ```ServiceType```s"""
city: InstanceOf[City]
"""```City``` instance that contains the block"""
@property
def area(self):
return self.geometry.area
@property
def industrial_area(self):
if self.buildings is not None:
return self.buildings.area.sum()
else:
return 0
@property
def living_area(self):
if self.buildings is not None:
return self.buildings.living_area.sum()
else:
return 0
[docs] def to_dict(self, simplify=True) -> dict[str, int]:
dict = {"id": self.id, "geometry": self.geometry}
if not simplify:
for service_type in self.services:
dict[service_type.name] = self[service_type.name]["capacity"]
dict["population"] = self.population
dict["is_living"] = self.is_living
return dict
[docs] def __contains__(self, service_type_name: str) -> bool:
"""Returns True if service type is contained inside the block"""
service_type = self.city[service_type_name]
return service_type in self.services
[docs] def __getitem__(self, service_type_name: str) -> dict[str, int]:
"""Get service type capacity and demand of the block"""
service_type = self.city[service_type_name]
result = {"capacity": 0, "demand": service_type.calculate_in_need(self.population)}
if service_type in self.services:
result["capacity"] = self.services[service_type]["capacity"].sum()
return result
[docs] def update_buildings(self, gdf: GeoDataFrame[BuildingRow] = None):
"""Update buildings GeoDataFrame of the block"""
if gdf is None:
self.buildings = None
else:
self.buildings = gpd.GeoDataFrame(gdf)
[docs] def update_services(self, service_type: ServiceType, gdf: GeoDataFrame[ServiceRow] = None):
"""Update services GeoDataFrame of the block"""
if gdf is None:
del self.services[service_type]
else:
self.services[service_type] = gpd.GeoDataFrame(gdf)
@property
def is_living(self) -> bool:
if self.buildings is not None:
return self.buildings.is_living.any()
else:
return False
@property
def population(self) -> int:
"""Return sum population of the city block"""
if self.buildings is not None:
return self.buildings.population.sum()
else:
return 0
[docs] @classmethod
def from_gdf(cls, gdf: gpd.GeoDataFrame, city: City) -> "dict[int, Block]":
"""Generate blocks dict from ```GeoDataFrame```"""
dict = {}
for i in gdf.index:
dict[i] = cls(id=i, geometry=gdf.loc[i].geometry, city=city)
return dict
[docs] def __hash__(self):
"""Make block hashable, so it can be used as key in dict etc."""
return hash(self.id)
[docs]class City:
epsg: int
adjacency_matrix: pd.DataFrame
_blocks: dict[int, Block]
_service_types: dict[str, ServiceType]
def __init__(self, blocks_gdf: gpd.GeoDataFrame, adjacency_matrix: pd.DataFrame) -> None:
assert (blocks_gdf.index == adjacency_matrix.index).all(), "Matrix and blocks index don't match"
assert (blocks_gdf.index == adjacency_matrix.columns).all(), "Matrix columns and blocks index don't match"
self.epsg = blocks_gdf.crs.to_epsg()
self._blocks = Block.from_gdf(blocks_gdf, self)
self.adjacency_matrix = adjacency_matrix.copy()
self._service_types = {}
for st in SERVICE_TYPES:
service_type = ServiceType(**st)
self._service_types[service_type.name] = service_type
@property
def blocks(self) -> list[Block]:
"""Return list of blocks"""
return [self._blocks[id] for id in self._blocks]
@property
def service_types(self) -> list[ServiceType]:
"""Return list of service types"""
return [self._service_types[name] for name in self._service_types]
def __str__(self):
description = ""
description += f"CRS: : EPSG:{self.epsg}\n"
description += f"Blocks count : {len(self.blocks)}\n"
description += f"Service types : \n"
service_types_description = "\n".join([f" {st}" for st in self.service_types])
return description + service_types_description
[docs] def plot(self) -> None:
"""Plot city model data"""
blocks = self.get_blocks_gdf()
ax = blocks.plot(alpha=1, color="#ddd", figsize=[10, 10])
# plot buildings
self.get_buildings_gdf().plot(ax=ax, markersize=1, color="#bbb")
# plot services
self.get_services_gdf().plot(
ax=ax,
markersize=5,
column="service_type",
legend=True,
legend_kwds={"title": "Service types", "loc": "lower left"},
)
ax.set_axis_off()
[docs] def get_service_type_gdf(self, service_type: ServiceType | str):
if not isinstance(service_type, ServiceType):
service_type = self[service_type]
services_blocks = filter(lambda b: service_type in b.services, self.blocks)
services_gdfs = list(map(lambda b: b.services[service_type].to_crs(4326), services_blocks))
gdf = gpd.GeoDataFrame(columns=["geometry", "capacity", "service_type"]).set_geometry("geometry").set_crs(4326)
gdf = pd.concat([gdf, *services_gdfs], ignore_index=True)
gdf["service_type"] = service_type.name
return gdf
[docs] def get_buildings_gdf(self) -> gpd.GeoDataFrame:
buildings_blocks = filter(lambda b: b.buildings is not None, self.blocks)
buildings_gdfs = list(map(lambda b: b.buildings.to_crs(4326), buildings_blocks))
gdf = gpd.GeoDataFrame(columns=["geometry"]).set_geometry("geometry").set_crs(4326)
gdf = pd.concat([gdf, *buildings_gdfs], ignore_index=True).to_crs(self.epsg)
return gdf
[docs] def get_services_gdf(self) -> gpd.GeoDataFrame:
gdfs = map(lambda st: self.get_service_type_gdf(st).to_crs(4326), self.service_types)
return pd.concat(gdfs, axis=0, ignore_index=True).to_crs(self.epsg)
[docs] def get_blocks_gdf(self, simplify=True) -> gpd.GeoDataFrame:
data: list[dict] = []
for block in self.blocks:
data.append(block.to_dict(simplify))
gdf = gpd.GeoDataFrame(data).set_index("id").set_crs(epsg=self.epsg)
return gdf
[docs] def update_buildings(self, gdf: gpd.GeoDataFrame):
"""Update buildings in blocks"""
assert gdf.crs.to_epsg() == self.epsg, "Buildings GeoDataFrame CRS should match city EPSG"
# reset buildings of blocks
for block in self.blocks:
block.update_buildings()
# spatial join blocks and buildings and updated related blocks info
sjoin = gdf.sjoin(self.get_blocks_gdf())
groups = sjoin.groupby("index_right")
for block_id, buildings_gdf in groups:
self[block_id].update_buildings(GeoDataFrame[BuildingRow](buildings_gdf))
[docs] def update_services(self, service_type: ServiceType | str, gdf: gpd.GeoDataFrame):
"""Update services in blocks of certain service_type"""
assert gdf.crs.to_epsg() == self.epsg, "Services GeoDataFrame CRS should match city EPSG"
if not isinstance(service_type, ServiceType):
service_type = self[service_type]
# reset services of blocks
for block in filter(lambda b: service_type.name in b, self.blocks):
block.update_services(service_type)
# spatial join blocks and services and update related blocks info
sjoin = gdf.sjoin(self.get_blocks_gdf())
groups = sjoin.groupby("index_right")
for block_id, services_gdf in groups:
self[block_id].update_services(service_type, GeoDataFrame[ServiceRow](services_gdf))
[docs] def add_service_type(self, service_type: ServiceType):
if service_type.name in self:
raise KeyError(f"The service type with this name already exists: {service_type.name}")
else:
self._service_types[service_type.name] = service_type
[docs] def get_distance(self, block_a: int | Block, block_b: int | Block):
"""Returns distance (in min) between two blocks"""
if isinstance(block_a, Block):
block_a = block_a.id
if isinstance(block_b, Block):
block_b = block_b.id
return self.adjacency_matrix.loc[block_a, block_b]
[docs] def get_out_edges(self, block: int | Block):
"""Get out edges for certain block"""
if isinstance(block, Block):
block = block.id
return [(self[block], self[block_b], weight) for block_b, weight in self.adjacency_matrix.loc[block].items()]
[docs] def get_in_edges(self, block: int | Block):
"""Get in edges for certain block"""
if isinstance(block, Block):
block = block.id
return [(self[block_b], self[block], weight) for block_b, weight in self.adjacency_matrix.loc[:, block].items()]
@singledispatchmethod
def __getitem__(self, arg):
raise NotImplementedError(f"Can't access object with such argument type {type(arg)}")
# Make city_model subscriptable, to access block via ID like city_model[123]
@__getitem__.register(int)
def _(self, block_id):
if not block_id in self._blocks:
raise KeyError(f"Can't find block with such id: {block_id}")
return self._blocks[block_id]
# Make city_model subscriptable, to access service type via name like city_model['schools']
@__getitem__.register(str)
def _(self, service_type_name):
if not service_type_name in self._service_types:
raise KeyError(f"Can't find service type with such name: {service_type_name}")
return self._service_types[service_type_name]
@singledispatchmethod
def __contains__(self, arg):
raise NotImplementedError(f"Wrong argument type for 'in': {type(arg)}")
# Make 'in' check available for blocks, to access like 123 in city_model
@__contains__.register(int)
def _(self, block_id):
return block_id in self._blocks
# Make 'in' check available for service types, to access like 'schools' in city_model
@__contains__.register(str)
def _(self, service_type_name):
return service_type_name in self._service_types
[docs] @staticmethod
def from_pickle(file_path: str):
"""Load city model from a .pickle file"""
state = None
with open(file_path, "rb") as f:
state = pickle.load(f)
return state
[docs] def to_pickle(self, file_path: str):
"""Save city model to a .pickle file"""
with open(file_path, "wb") as f:
pickle.dump(self, f)