Spaces:
Sleeping
Sleeping
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
import json | |
import re | |
from dataclasses import dataclass, field | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional | |
from lxml import etree | |
import numpy as np | |
from utils.geo import BoundaryBox, Projection | |
METERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*m$") | |
KILOMETERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*km$") | |
MILES_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*mi$") | |
def parse_float(string: str) -> Optional[float]: | |
"""Parse string representation of a float or integer value.""" | |
try: | |
return float(string) | |
except (TypeError, ValueError): | |
return None | |
class OSMElement: | |
""" | |
Something with tags (string to string mapping). | |
""" | |
id_: int | |
tags: Dict[str, str] | |
def get_float(self, key: str) -> Optional[float]: | |
"""Parse float from tag value.""" | |
if key in self.tags: | |
return parse_float(self.tags[key]) | |
return None | |
def get_length(self, key: str) -> Optional[float]: | |
"""Get length in meters.""" | |
if key not in self.tags: | |
return None | |
value: str = self.tags[key] | |
float_value: float = parse_float(value) | |
if float_value is not None: | |
return float_value | |
for pattern, ratio in [ | |
(METERS_PATTERN, 1.0), | |
(KILOMETERS_PATTERN, 1000.0), | |
(MILES_PATTERN, 1609.344), | |
]: | |
matcher: re.Match = pattern.match(value) | |
if matcher: | |
float_value: float = parse_float(matcher.group("value")) | |
if float_value is not None: | |
return float_value * ratio | |
return None | |
def __hash__(self) -> int: | |
return self.id_ | |
class OSMNode(OSMElement): | |
""" | |
OpenStreetMap node. | |
See https://wiki.openstreetmap.org/wiki/Node | |
""" | |
geo: np.ndarray | |
visible: Optional[str] = None | |
xy: Optional[np.ndarray] = None | |
def from_dict(cls, structure: Dict[str, Any]) -> "OSMNode": | |
""" | |
Parse node from Overpass-like structure. | |
:param structure: input structure | |
""" | |
return cls( | |
structure["id"], | |
structure.get("tags", {}), | |
geo=np.array((structure["lat"], structure["lon"])), | |
visible=structure.get("visible"), | |
) | |
class OSMWay(OSMElement): | |
""" | |
OpenStreetMap way. | |
See https://wiki.openstreetmap.org/wiki/Way | |
""" | |
nodes: Optional[List[OSMNode]] = field(default_factory=list) | |
visible: Optional[str] = None | |
def from_dict( | |
cls, structure: Dict[str, Any], nodes: Dict[int, OSMNode] | |
) -> "OSMWay": | |
""" | |
Parse way from Overpass-like structure. | |
:param structure: input structure | |
:param nodes: node structure | |
""" | |
return cls( | |
structure["id"], | |
structure.get("tags", {}), | |
[nodes[x] for x in structure["nodes"]], | |
visible=structure.get("visible"), | |
) | |
def is_cycle(self) -> bool: | |
"""Is way a cycle way or an area boundary.""" | |
return self.nodes[0] == self.nodes[-1] | |
def __repr__(self) -> str: | |
return f"Way <{self.id_}> {self.nodes}" | |
class OSMMember: | |
""" | |
Member of OpenStreetMap relation. | |
""" | |
type_: str | |
ref: int | |
role: str | |
class OSMRelation(OSMElement): | |
""" | |
OpenStreetMap relation. | |
See https://wiki.openstreetmap.org/wiki/Relation | |
""" | |
members: Optional[List[OSMMember]] | |
visible: Optional[str] = None | |
def from_dict(cls, structure: Dict[str, Any]) -> "OSMRelation": | |
""" | |
Parse relation from Overpass-like structure. | |
:param structure: input structure | |
""" | |
return cls( | |
structure["id"], | |
structure["tags"], | |
[OSMMember(x["type"], x["ref"], x["role"]) for x in structure["members"]], | |
visible=structure.get("visible"), | |
) | |
class OSMData: | |
""" | |
The whole OpenStreetMap information about nodes, ways, and relations. | |
""" | |
def __init__(self) -> None: | |
self.nodes: Dict[int, OSMNode] = {} | |
self.ways: Dict[int, OSMWay] = {} | |
self.relations: Dict[int, OSMRelation] = {} | |
self.box: BoundaryBox = None | |
def from_dict(cls, structure: Dict[str, Any]): | |
data = cls() | |
bounds = structure.get("bounds") | |
if bounds is not None: | |
data.box = BoundaryBox( | |
np.array([bounds["minlat"], bounds["minlon"]]), | |
np.array([bounds["maxlat"], bounds["maxlon"]]), | |
) | |
for element in structure["elements"]: | |
if element["type"] == "node": | |
node = OSMNode.from_dict(element) | |
data.add_node(node) | |
for element in structure["elements"]: | |
if element["type"] == "way": | |
way = OSMWay.from_dict(element, data.nodes) | |
data.add_way(way) | |
for element in structure["elements"]: | |
if element["type"] == "relation": | |
relation = OSMRelation.from_dict(element) | |
data.add_relation(relation) | |
return data | |
def from_json(cls, path: Path): | |
with path.open(encoding='utf-8') as fid: | |
structure = json.load(fid) | |
return cls.from_dict(structure) | |
def from_xml(cls, path: Path): | |
root = etree.parse(str(path)).getroot() | |
structure = {"elements": []} | |
from tqdm import tqdm | |
for elem in tqdm(root): | |
if elem.tag == "bounds": | |
structure["bounds"] = { | |
k: float(elem.attrib[k]) | |
for k in ("minlon", "minlat", "maxlon", "maxlat") | |
} | |
elif elem.tag in {"node", "way", "relation"}: | |
if elem.tag == "node": | |
item = { | |
"id": int(elem.attrib["id"]), | |
"lat": float(elem.attrib["lat"]), | |
"lon": float(elem.attrib["lon"]), | |
"visible": elem.attrib.get("visible"), | |
"tags": { | |
x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" | |
}, | |
} | |
elif elem.tag == "way": | |
item = { | |
"id": int(elem.attrib["id"]), | |
"visible": elem.attrib.get("visible"), | |
"tags": { | |
x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" | |
}, | |
"nodes": [int(x.attrib["ref"]) for x in elem if x.tag == "nd"], | |
} | |
elif elem.tag == "relation": | |
item = { | |
"id": int(elem.attrib["id"]), | |
"visible": elem.attrib.get("visible"), | |
"tags": { | |
x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" | |
}, | |
"members": [ | |
{ | |
"type": x.attrib["type"], | |
"ref": int(x.attrib["ref"]), | |
"role": x.attrib["role"], | |
} | |
for x in elem | |
if x.tag == "member" | |
], | |
} | |
item["type"] = elem.tag | |
structure["elements"].append(item) | |
elem.clear() | |
del root | |
return cls.from_dict(structure) | |
def from_file(cls, path: Path): | |
ext = path.suffix | |
if ext == ".json": | |
return cls.from_json(path) | |
elif ext in {".osm", ".xml"}: | |
return cls.from_xml(path) | |
else: | |
raise ValueError(f"Unknown extension for {path}") | |
def add_node(self, node: OSMNode): | |
"""Add node and update map parameters.""" | |
if node.id_ in self.nodes: | |
raise ValueError(f"Node with duplicate id {node.id_}.") | |
self.nodes[node.id_] = node | |
def add_way(self, way: OSMWay): | |
"""Add way and update map parameters.""" | |
if way.id_ in self.ways: | |
raise ValueError(f"Way with duplicate id {way.id_}.") | |
self.ways[way.id_] = way | |
def add_relation(self, relation: OSMRelation): | |
"""Add relation and update map parameters.""" | |
if relation.id_ in self.relations: | |
raise ValueError(f"Relation with duplicate id {relation.id_}.") | |
self.relations[relation.id_] = relation | |
def add_xy_to_nodes(self, proj: Projection): | |
nodes = list(self.nodes.values()) | |
if len(nodes) == 0: | |
return | |
geos = np.stack([n.geo for n in nodes], 0) | |
if proj.bounds is not None: | |
# For some reasons few nodes are sometimes very far off the initial bbox. | |
valid = proj.bounds.contains(geos) | |
if valid.mean() < 0.9: | |
print("Many nodes are out of the projection bounds.") | |
xys = np.zeros_like(geos) | |
xys[valid] = proj.project(geos[valid]) | |
else: | |
xys = proj.project(geos) | |
for xy, node in zip(xys, nodes): | |
node.xy = xy | |