Spaces:
Sleeping
Sleeping
File size: 6,826 Bytes
ef35e05 |
|
import osmnx as ox
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
class pub_crawl:
def __init__(self, df, G):
'''Initialise a pub_crawl instance
df: pd.Dataframe containing pubs 'name' and coordinates ('latitude', 'longitude')
G: nx.MultiDiGraph of the map area
'''
self.df = df
self.G = G
self.pub_names = df['name'].to_list()
self.initial_route = self.pub_names
self.optimal_route = None
self.optimal_distance = None
self.pub_nodes = self.create_pub_nodes()
def create_pub_nodes(self):
# Dictionary of pub names and coordinates
pubs_dict = self.df.drop('address', axis=1).set_index('name').T.to_dict('list')
# Get graph nodes for each pub
pub_nodes = {}
for k, v in pubs_dict.items():
pub_nodes[k] = ox.nearest_nodes(self.G, X = v[1], Y = v[0])
return pub_nodes
def get_route_length(self, p0, p1):
# Find length in meters of shortest path from pub p0 to pub p1
route = nx.shortest_path(self.G, self.pub_nodes[p0], self.pub_nodes[p1], weight='length')
route_lengths = ox.utils_graph.get_route_edge_attributes(self.G, route, attribute = 'length')
route_length_total = sum(route_lengths)
return route_length_total
def create_distance_matrix(self, pubs_considered):
distance_matrix = []
for i in range(len(pubs_considered)):
row = []
for j in range(len(pubs_considered)):
distance = self.get_route_length(pubs_considered[i], pubs_considered[j])
row.append(round(distance*1000)) # avoids rounding error in Google's OR-Tools package
distance_matrix.append(row)
return distance_matrix
def plot_map(self):
node_colours = ['#FF0000' if i in list(self.pub_nodes.values()) else '#999999' for i in self.G.nodes]
fig, ax = ox.plot_graph(self.G, bgcolor='#FFFFFF', node_color=node_colours, show=False, close=False)
for _, node in ox.graph_to_gdfs(self.G, nodes=True, edges=False).fillna("").iterrows():
for k, v in self.pub_nodes.items():
if node.name == v:
c = node["geometry"].centroid
ax.annotate(k, xy=(c.x, c.y), xycoords='data', xytext=(3, -2), textcoords='offset points', size=8)
plt.show()
def get_route_nodes(self, route):
# Find intermediary route nodes for plotting
route_nodes = []
for i in range(len(route)-1):
path = nx.shortest_path(self.G, self.pub_nodes[route[i]], self.pub_nodes[route[i+1]], weight='length')
route_nodes.append(path)
return route_nodes
def plot_route(self, route):
route_nodes = self.get_route_nodes(route)
fig, ax = ox.plot_graph_routes(self.G, route_nodes, bgcolor='#FFFFFF', show=False, close=False,
figsize=(12, 12))
for _, node in ox.graph_to_gdfs(self.G, nodes=True, edges=False).fillna("").iterrows():
for i, k in enumerate(route):
if node.name == self.pub_nodes[k]:
c = node["geometry"].centroid
ax.annotate(f'{i}: {k}', xy=(c.x, c.y), xycoords='data', xytext=(3, -2), textcoords='offset points', size=8)
return fig
def create_data(self, start, pubs_considered):
data = {}
start_index = pubs_considered.index(start)
data['distance_matrix'] = self.create_distance_matrix(pubs_considered)
data['num_vehicles'] = 1
data['depot'] = start_index
return data
def format_solution(self, manager, routing, solution, pubs_considered):
"""Formats solution for osmnx plotting"""
index = routing.Start(0)
route = [index]
leg_distances = []
route_distance = 0
while not routing.IsEnd(index):
previous_index = index
index = solution.Value(routing.NextVar(index))
if route[0] == manager.IndexToNode(index):
break
route.append(manager.IndexToNode(index))
leg_distance = routing.GetArcCostForVehicle(previous_index, index, 0)
leg_distances.append(leg_distance)
route_distance += leg_distance
self.optimal_route = [pubs_considered[i] for i in route]
self.optimal_distance = round(route_distance/1000)
def optimise(self, start_point, pubs_considered):
def distance_callback(from_index, to_index):
"""Returns the distance between the two nodes."""
# Convert from routing variable Index to distance matrix NodeIndex
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data['distance_matrix'][from_node][to_node]
data = self.create_data(start_point, pubs_considered)
# Create the routing index manager
manager = pywrapcp.RoutingIndexManager(len(data['distance_matrix']),
data['num_vehicles'], data['depot'])
# Create Routing Model
routing = pywrapcp.RoutingModel(manager)
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
# Define cost of each arc
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# Settings for simualted annealing
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.SIMULATED_ANNEALING)
search_parameters.time_limit.seconds = 5
search_parameters.log_search = True
# Solve the problem
solution = routing.SolveWithParameters(search_parameters)
# Format solution
if solution:
self.format_solution(manager, routing, solution, pubs_considered)
if __name__=='main':
df = pd.read_csv('galway_pubs.csv')
G = ox.io.load_graphml('galway.graphml')
crawler = pub_crawl(df, G)
print('Locations of pubs in Galway to consider:')
crawler.plot_map()
initial_route = crawler.initial_route
print('Initial route before optimisation (default pub ordering):')
crawler.plot_route(initial_route)
start_pub = 'The Sliding Rock'
crawler.optimise(start_pub, ['The Sliding Rock', 'The Salt House', 'Caribou'])
optimal_route = crawler.optimal_route
print('Optimised route:')
print(f'Route distance: {crawler.optimal_distance} meters')
crawler.plot_route(optimal_route)
|