import re
from itertools import chain, combinations
import geopandas as gpd
import osmnx as ox
import pandas as pd
import numpy as np
from bertopic import BERTopic
from hdbscan import HDBSCAN
from shapely.geometry import LineString
from transformers.pipelines import pipeline
from umap import UMAP
[docs]class EventDetection:
"""
This class is aimed to generate events and their connections.
It is based on the application of semantic clustering method (BERTopic)
on the texts in the context of urban spatial model
"""
def __init__(self):
np.random.seed(42)
self.population_filepath = None
self.levels = ["building", "link", "road", "global"]
self.levels_scale = dict(zip(self.levels, list(range(2, 10, 2))))
self.functions_weights = {
"Безопасность": 0.12,
"Благоустройство": 0.21,
"Дороги": 0.18,
"ЖКХ": 0.2,
"Здравоохранение": 0.2,
"Другое": 0.16,
"Образование": 0.16,
"Социальная защита": 0.13,
"Строительство": 0.19,
"Обращение с отходами": 0.19,
"Транспорт": 0.17,
"Экология": 0.22,
"Энергетика": 0.19,
}
self.messages = None
self.links = None
self.buildings = None
self.population = None
self.topic_model = None
self.events = None
self.connections = None
[docs] def _get_roads(self, city_name, city_crs) -> gpd.GeoDataFrame:
"""
Get the road network of a city as road links and roads.
Args:
city_name (string): The name of the city.
city_crs (int): The spatial reference code (CRS) of the city.
Returns:
links (GeoDataFrame): GeoDataFrame with the city's road links and roads.
"""
links = ox.graph_from_place(city_name, network_type="drive")
links = ox.utils_graph.graph_to_gdfs(links, nodes=False).to_crs(
city_crs
)
links = links.reset_index(drop=True)
links["link_id"] = links.index
links["geometry"] = links["geometry"].buffer(7)
links = links.to_crs(4326)
links = links[["link_id", "name", "geometry"]]
links.loc[links["name"].map(type) == list, "name"] = links[
links["name"].map(type) == list
]["name"].map(lambda x: ", ".join(x))
road_id_name = dict(enumerate(links.name.dropna().unique().tolist()))
road_name_id = {v: k for k, v in road_id_name.items()}
links["road_id"] = links["name"].replace(road_name_id)
return links
[docs] def _get_buildings(self) -> gpd.GeoDataFrame:
"""
Get the buildings of a city as a GeoDataFrame
Args:
links(GeoDataFrame): GeoDataFrame with the city's road links and roads.
filepath (string): The path to the GeoJSON file with building data. The default is set to 'population.geojson'.
Returns:
buildings (GeoDataFrame): GeoDataFrame with the city's buildings.
"""
buildings = gpd.read_file(self.population_filepath)
buildings = buildings[
["address", "building_id", "population_balanced", "geometry"]
]
buildings = buildings.to_crs(4326)
buildings["building_id"] = buildings.index
buildings = (
gpd.sjoin_nearest(
buildings,
self.links[["link_id", "road_id", "geometry"]],
how="left",
max_distance=500,
)
.drop(columns=["index_right"])
.drop_duplicates(subset="building_id")
)
self.buildings = buildings
return buildings
[docs] def _collect_population(self) -> dict:
"""
Collect population data for each object (building, street, link).
"""
buildings = self.buildings.copy()
pops_global = {0: buildings.population_balanced.sum()}
pops_buildings = buildings["population_balanced"].to_dict()
pops_links = (
buildings[["population_balanced", "link_id"]]
.groupby("link_id")
.sum()["population_balanced"]
.to_dict()
)
pops_roads = (
buildings[["population_balanced", "road_id"]]
.groupby("road_id")
.sum()["population_balanced"]
.to_dict()
)
pops = {
"global": pops_global,
"road": pops_roads,
"link": pops_links,
"building": pops_buildings,
}
self.population = pops
return pops
[docs] def _preprocess(self) -> gpd.GeoDataFrame:
"""
Preprocess the data
"""
messages = self.messages[
[
"Текст комментария",
"geometry",
"Дата и время",
"message_id",
"cats",
]
]
messages = messages.sjoin(self.buildings, how="left")[
[
"Текст комментария",
"address",
"geometry",
"building_id",
"message_id",
"Дата и время",
"cats",
]
]
messages.rename(
columns={"Текст комментария": "text", "Дата и время": "date_time"},
inplace=True,
)
messages = messages.sjoin(self.links, how="left")[
[
"text",
"geometry",
"building_id",
"index_right",
"name",
"message_id",
"date_time",
"cats",
"road_id",
]
]
messages.rename(
columns={"index_right": "link_id", "name": "road_name"},
inplace=True,
)
messages = messages.join(
self.buildings[["link_id", "road_id"]],
on="building_id",
rsuffix="_from_building",
)
messages.loc[messages.link_id.isna(), "link_id"] = messages.loc[
messages.link_id.isna()
]["link_id_from_building"]
messages.loc[messages.road_id.isna(), "road_id"] = messages.loc[
messages.road_id.isna()
]["road_id_from_building"]
messages = messages[
[
"message_id",
"text",
"geometry",
"building_id",
"link_id",
"road_id",
"date_time",
"cats",
]
].dropna(subset="text")
messages["cats"] = (
messages.cats.astype(str).str.split("; ").map(lambda x: x[0])
)
messages["importance"] = messages["cats"].map(self.functions_weights)
messages["importance"].fillna(0.16, inplace=True)
messages["global_id"] = 0
return messages
[docs] def _create_model(self, min_event_size):
"""
Create a topic model with a UMAP, HDBSCAN, and a BERTopic model.
"""
umap_model = UMAP(
n_neighbors=15,
n_components=5,
min_dist=0.0,
metric="cosine",
random_state=42,
)
hdbscan_model = HDBSCAN(
min_cluster_size=min_event_size,
min_samples=1,
metric="euclidean",
cluster_selection_method="eom",
prediction_data=True,
)
embedding_model = pipeline(
"feature-extraction", model="cointegrated/rubert-tiny2"
)
topic_model = BERTopic(
embedding_model=embedding_model,
hdbscan_model=hdbscan_model,
umap_model=umap_model,
calculate_probabilities=True,
verbose=True,
n_gram_range=(1, 3),
)
return topic_model
[docs] def _event_from_object(
self,
messages,
topic_model,
target_column: str,
population: dict,
object_id: float,
event_level: str,
):
"""
Create a list of events for a given object
(building, street, link, total).
"""
local_messages = messages[messages[target_column] == object_id]
message_ids = local_messages.message_id.tolist()
docs = local_messages.text.tolist()
if len(docs) >= 5:
try:
topics, probs = topic_model.fit_transform(docs)
except TypeError:
print("Can't reduce dimensionality or some other problem")
return
try:
topics = topic_model.reduce_outliers(docs, topics)
topic_model.update_topics(docs, topics=topics)
except ValueError:
print("Can't distribute all messages in topics")
event_model = topic_model.get_topic_info()
event_model["level"] = event_level
event_model["object_id"] = str(object_id)
event_model["id"] = event_model.apply(
lambda x: f"{str(x.Topic)}_{str(x.level)}_{str(x.object_id)}",
axis=1,
)
try:
event_model["potential_population"] = population[event_level][
object_id
]
except Exception: # need to select type of error
event_model["potential_population"] = population["global"][0]
clustered_messages = pd.DataFrame(
data={"id": message_ids, "text": docs, "topic_id": topics}
)
event_model["message_ids"] = [
clustered_messages[clustered_messages["topic_id"] == topic][
"id"
].tolist()
for topic in event_model.Topic
]
event_model["duration"] = event_model.message_ids.map(
lambda x: (
pd.to_datetime(
messages[messages["message_id"].isin(x)].date_time
).max()
- pd.to_datetime(
messages[messages["message_id"].isin(x)].date_time
).min()
).days
)
event_model["category"] = event_model.message_ids.map(
lambda x: ", ".join(
messages[messages["message_id"].isin(x)]
.cats.mode()
.tolist()
)
)
event_model["importance"] = event_model.message_ids.map(
lambda x: messages[
messages["message_id"].isin(x)
].importance.mean()
)
return event_model
else:
return
[docs] def _get_events(self, min_event_size) -> gpd.GeoDataFrame:
"""
Create a list of events for all levels.
"""
messages = self.messages.copy()
messages_list = messages.text.tolist()
index_list = messages.message_id.tolist()
pops = self._collect_population()
topic_model = self._create_model(min_event_size)
events = [
[
self._event_from_object(
messages, topic_model, f"{level}_id", pops, oid, level
)
for oid in messages[f"{level}_id"].unique().tolist()
]
for level in reversed(self.levels)
]
events = [
item for sublist in events for item in sublist if item is not None
]
events = pd.concat(list(chain(events)))
events["geometry"] = events.message_ids.map(
lambda x: messages[
messages.message_id.isin(x)
].geometry.unary_union.representative_point()
)
events = gpd.GeoDataFrame(events, geometry="geometry").set_crs(4326)
events.rename(
columns={
"Name": "name",
"Representative_Docs": "docs",
"Count": "intensity",
"potential_population": "population",
},
inplace=True,
)
events["docs"] = events["docs"].map(
lambda x: ", ".join(
[str(index_list[messages_list.index(text)]) for text in x]
)
)
events.message_ids = events.message_ids.map(
lambda x: ", ".join([str(id) for id in x])
)
events["intensity"] = (
events["intensity"] - events["intensity"].min()
) / (events["intensity"].max() - events["intensity"].min())
events["duration"] = (events["duration"] - events["duration"].min()) / (
events["duration"].max() - events["duration"].min()
)
events.loc[events.intensity == 0, "intensity"] = 0.1 # fix later
events.loc[events.duration.isna(), "duration"] = 1 # fix later
events["risk"] = (
events.intensity
* events.duration
* events.importance
* events.population
)
events["message_ids"] = events.message_ids.map(
lambda x: ", ".join(list(set(x.split(", "))))
)
events["docs"] = events.docs.map(
lambda x: ", ".join(list(set(x.split(", "))))
)
return events
[docs] def _get_event_connections(self) -> gpd.GeoDataFrame:
"""
Create a list of connections between events.
"""
events = self.events.copy()
events.index = events.id
events.geometry = events.centroid
weights = [
len((set(c[0]) & set(c[1])))
for c in combinations(self.events.message_ids, 2)
]
nodes = [c for c in combinations(events.id, 2)]
connections = pd.DataFrame(nodes, weights).reset_index()
connections.columns = ["weight", "a", "b"]
connections = connections[connections["weight"] > 0]
connections = connections.join(events.geometry, on="a", rsuffix="_")
connections = connections.join(events.geometry, on="b", rsuffix="_")
events.reset_index(drop=True, inplace=True)
connections["geometry"] = connections.apply(
lambda x: LineString([x["geometry"], x["geometry_"]]), axis=1
)
connections.drop(columns=["geometry_"], inplace=True)
connections = gpd.GeoDataFrame(
connections, geometry="geometry"
).set_crs(32636)
return connections
[docs] def _rebalance(
self, connections, events, levels, event_population: int, event_id: str
):
"""
Rebalance the population of an event.
"""
connections_of_event = connections[connections.a == event_id].b
if len(connections_of_event) > 0:
accounted_pops = events[
events.id.isin(connections_of_event) & events.level.isin(levels)
].population.sum()
if event_population >= accounted_pops:
rebalanced_pops = event_population - accounted_pops
else:
connections_of_event = connections[connections.b == event_id].a
accounted_pops = events[
events.id.isin(connections_of_event)
& events.level.isin(levels)
].population.sum()
rebalanced_pops = event_population - accounted_pops
return rebalanced_pops
else:
return event_population
[docs] def _rebalance_events(self) -> gpd.GeoDataFrame:
"""
Rebalance the population of events.
"""
levels = self.levels.copy()
events = self.events.copy()
connections = self.connections.copy()
events_rebalanced = []
for level in levels[1:]:
levels_to_account = levels[: levels.index(level)]
events_for_level = events[events.level == level]
events_for_level["rebalanced_population"] = events_for_level.apply(
lambda x: self._rebalance(
connections,
events,
levels_to_account,
x.population,
x.id,
),
axis=1,
)
events_rebalanced.append(events_for_level)
events_rebalanced = pd.concat(events_rebalanced)
events_rebalanced.loc[
events_rebalanced.rebalanced_population.isna(),
"rebalanced_population",
] = 0
events_rebalanced[
"population"
] = events_rebalanced.rebalanced_population
events_rebalanced.drop(columns=["rebalanced_population"], inplace=True)
events_rebalanced.population = events_rebalanced.population.astype(int)
events_rebalanced["population"] = (
events_rebalanced["population"]
- events_rebalanced["population"].min()
) / (
events_rebalanced["population"].max()
- events_rebalanced["population"].min()
)
events_rebalanced.loc[
events_rebalanced.population == 0, "population"
] = 0.01 # fix later
events_rebalanced.loc[
events_rebalanced.population.isna()
& events_rebalanced.level.isin(["building", "link"]),
"population",
] = 0.01 # fix later
events_rebalanced.loc[
events_rebalanced.population.isna()
& events_rebalanced.level.isin(["road", "global"]),
"population",
] = 1 # fix later
events_rebalanced["risk"] = (
events_rebalanced.intensity
* (events_rebalanced.duration + 1)
* events_rebalanced.importance
)
events_rebalanced = events_rebalanced[
["name", "docs", "level", "id", "risk", "message_ids", "geometry"]
]
return events_rebalanced
[docs] def _filter_outliers(self):
"""
Filter outliers.
"""
pattern = r"^-1.*"
events = self.events
connections = self.connections
print(
len(
events[
events.name.map(
lambda x: True if re.match(pattern, x) else False
)
]
),
"outlier clusters of",
len(events),
"total clusters. Filtering...",
)
events = events[
events.name.map(lambda x: False if re.match(pattern, x) else True)
]
connections = connections[
connections.a.map(lambda x: False if re.match(pattern, x) else True)
]
connections = connections[
connections.b.map(lambda x: False if re.match(pattern, x) else True)
]
return events, connections
[docs] def _prepare_messages(self):
"""
Prepare messages for export.
"""
messages = self.messages.copy()
messages = messages.reset_index(drop=True)
messages.rename(columns={"cats": "block"}, inplace=True)
messages = messages[
["message_id", "text", "geometry", "date_time", "block"]
]
messages = messages.to_crs(4326)
return messages
[docs] def run(
self,
target_texts: gpd.GeoDataFrame,
filepath_to_population: str,
city_name: str,
city_crs: int,
min_event_size: int,
):
"""
Returns a GeoDataFrame of events, a GeoDataFrame of
connections between events, and a GeoDataFrame of messages.
"""
self.population_filepath = filepath_to_population
self.messages = target_texts.copy()
print("messages loaded")
self.links = self._get_roads(city_name, city_crs)
print("road links loaded")
self.buildings = self._get_buildings()
print("buildings loaded")
self.messages = self._preprocess()
print("messages preprocessed")
self.events = self._get_events(min_event_size)
print("events detected")
self.connections = self._get_event_connections()
print("connections generated")
self.events = self._rebalance_events()
print("population and risk rebalanced")
self.events, self.connections = self._filter_outliers()
print("outliers filtered")
self.messages = self._prepare_messages()
print("done!")
return self.messages, self.events, self.connections