# Copyright (c) Meta Platforms, Inc. and affiliates. import logging from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Tuple import numpy as np from .parser import ( filter_area, filter_node, filter_way, match_to_group, parse_area, parse_node, parse_way, Patterns, ) from .reader import OSMData, OSMNode, OSMRelation, OSMWay logger = logging.getLogger(__name__) def glue(ways: List[OSMWay]) -> List[List[OSMNode]]: result: List[List[OSMNode]] = [] to_process: Set[Tuple[OSMNode]] = set() for way in ways: if way.is_cycle(): result.append(way.nodes) else: to_process.add(tuple(way.nodes)) while to_process: nodes: List[OSMNode] = list(to_process.pop()) glued: Optional[List[OSMNode]] = None other_nodes: Optional[Tuple[OSMNode]] = None for other_nodes in to_process: glued = try_to_glue(nodes, list(other_nodes)) if glued is not None: break if glued is not None: to_process.remove(other_nodes) if is_cycle(glued): result.append(glued) else: to_process.add(tuple(glued)) else: result.append(nodes) return result def is_cycle(nodes: List[OSMNode]) -> bool: """Is way a cycle way or an area boundary.""" return nodes[0] == nodes[-1] def try_to_glue(nodes: List[OSMNode], other: List[OSMNode]) -> Optional[List[OSMNode]]: """Create new combined way if ways share endpoints.""" if nodes[0] == other[0]: return list(reversed(other[1:])) + nodes if nodes[0] == other[-1]: return other[:-1] + nodes if nodes[-1] == other[-1]: return nodes + list(reversed(other[:-1])) if nodes[-1] == other[0]: return nodes + other[1:] return None def multipolygon_from_relation(rel: OSMRelation, osm: OSMData): inner_ways = [] outer_ways = [] for member in rel.members: if member.type_ == "way": if member.role == "inner": if member.ref in osm.ways: inner_ways.append(osm.ways[member.ref]) elif member.role == "outer": if member.ref in osm.ways: outer_ways.append(osm.ways[member.ref]) else: logger.warning(f'Unknown member role "{member.role}".') if outer_ways: inners_path = glue(inner_ways) outers_path = glue(outer_ways) return inners_path, outers_path @dataclass class MapElement: id_: int label: str group: str tags: Optional[Dict[str, str]] @dataclass class MapNode(MapElement): xy: np.ndarray @classmethod def from_osm(cls, node: OSMNode, label: str, group: str): return cls( node.id_, label, group, node.tags, xy=node.xy, ) @dataclass class MapLine(MapElement): xy: np.ndarray @classmethod def from_osm(cls, way: OSMWay, label: str, group: str): xy = np.stack([n.xy for n in way.nodes]) return cls( way.id_, label, group, way.tags, xy=xy, ) @dataclass class MapArea(MapElement): outers: List[np.ndarray] inners: List[np.ndarray] = field(default_factory=list) @classmethod def from_relation(cls, rel: OSMRelation, label: str, group: str, osm: OSMData): outers_inners = multipolygon_from_relation(rel, osm) if outers_inners is None: return None outers, inners = outers_inners outers = [np.stack([n.xy for n in way]) for way in outers] inners = [np.stack([n.xy for n in way]) for way in inners] return cls( rel.id_, label, group, rel.tags, outers=outers, inners=inners, ) @classmethod def from_way(cls, way: OSMWay, label: str, group: str): xy = np.stack([n.xy for n in way.nodes]) return cls( way.id_, label, group, way.tags, outers=[xy], ) class MapData: def __init__(self): self.nodes: Dict[int, MapNode] = {} self.lines: Dict[int, MapLine] = {} self.areas: Dict[int, MapArea] = {} @classmethod def from_osm(cls, osm: OSMData): self = cls() for node in filter(filter_node, osm.nodes.values()): label = parse_node(node.tags) if label is None: continue group = match_to_group(label, Patterns.nodes) if group is None: group = match_to_group(label, Patterns.ways) if group is None: continue # missing self.nodes[node.id_] = MapNode.from_osm(node, label, group) for way in filter(filter_way, osm.ways.values()): label = parse_way(way.tags) if label is None: continue group = match_to_group(label, Patterns.ways) if group is None: group = match_to_group(label, Patterns.nodes) if group is None: continue # missing self.lines[way.id_] = MapLine.from_osm(way, label, group) for area in filter(filter_area, osm.ways.values()): label = parse_area(area.tags) if label is None: continue group = match_to_group(label, Patterns.areas) if group is None: group = match_to_group(label, Patterns.ways) if group is None: group = match_to_group(label, Patterns.nodes) if group is None: continue # missing self.areas[area.id_] = MapArea.from_way(area, label, group) for rel in osm.relations.values(): if rel.tags.get("type") != "multipolygon": continue label = parse_area(rel.tags) if label is None: continue group = match_to_group(label, Patterns.areas) if group is None: group = match_to_group(label, Patterns.ways) if group is None: group = match_to_group(label, Patterns.nodes) if group is None: continue # missing area = MapArea.from_relation(rel, label, group, osm) assert rel.id_ not in self.areas # not sure if there can be collision if area is not None: self.areas[rel.id_] = area return self