|
import os |
|
import folium |
|
import confuse |
|
import numpy as np |
|
from math import isnan |
|
import geopandas as gpd |
|
from shapely.geometry import Point |
|
from PIL import Image |
|
from tqdm import tqdm |
|
import geopy |
|
from geopy.geocoders import Nominatim |
|
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable |
|
|
|
import random |
|
import string |
|
import os |
|
from datetime import datetime, timedelta |
|
|
|
TOKEN_FILE = "tokens.txt" |
|
EXPIRED_FILE = "tokens_expired.txt" |
|
NEW_TOKEN_INSTRUCTIONS="""<div style="padding: 20px; border-radius: 5px;"> |
|
<h3 style="color: #4CAF50;">Get Your SNET API Token</h3> |
|
|
|
<p style="color: #666; font-size: 16px;"> |
|
To get an API token for the SNET platform: |
|
<ol> |
|
<li>Purchase tokens on the SNET platform using AGIX</li> |
|
<li> <del> Input the email address you used to sign up for the dashboard </del>. Any email will work for testing purposes</li> |
|
<li>An API token will be generated for you</li> |
|
</ol> |
|
|
|
The generated token will be valid for unlimited calls to the forecasting API for 15 minutes from your first call, for any of your fields. |
|
</p> |
|
|
|
<p style="color: #666; font-size: 16px;"> |
|
You can find us on the SNET platform at: |
|
<a href="https://beta.singularitynet.io/servicedetails/org/EnigmaAi/service/farmingID" style="color: #007bff; text-decoration: none;">SNET Platform</a> |
|
</p> |
|
</div> |
|
""" |
|
|
|
import pandas as pd |
|
import os |
|
|
|
def manage_user_tokens(current_user, api_token, valid_until): |
|
""" |
|
Manages the storage and retrieval of user API tokens. |
|
|
|
Args: |
|
current_user (str): The username of the currently logged-in user. |
|
api_token (str): The API token to be stored. |
|
valid_until (str): The expiration date of the API token (in 'YYYY-MM-DD HH:MM:SS' format). |
|
|
|
Returns: |
|
None |
|
""" |
|
filename = f'{current_user}_tokens.csv' |
|
|
|
if not os.path.exists(filename): |
|
|
|
df = pd.DataFrame({'token': [api_token], 'valid_until': [valid_until]}) |
|
df.to_csv(filename, index=False) |
|
else: |
|
|
|
df = pd.read_csv(filename) |
|
|
|
if api_token not in df['token'].values: |
|
|
|
new_row = {'token': api_token, 'valid_until': valid_until} |
|
new_row_df = pd.DataFrame(new_row, index=[0]) |
|
df = pd.concat([df, new_row_df], ignore_index=True) |
|
df.to_csv(filename, index=False) |
|
return df |
|
|
|
def generate_random_unique_tokens(num_tokens=10, token_file=TOKEN_FILE): |
|
'''Generates a list of random unique tokens and saves them to a file.''' |
|
if not os.path.exists(token_file): |
|
with open(token_file, 'w') as f: |
|
tokens = set() |
|
while len(tokens) < num_tokens: |
|
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) |
|
tokens.add(token) |
|
for token in tokens: |
|
f.write(token + '\n') |
|
else: |
|
with open(token_file, 'r') as f: |
|
tokens = set(f.read().splitlines()) |
|
with open(token_file, 'a') as f: |
|
while len(tokens) < num_tokens: |
|
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) |
|
if token not in tokens: |
|
tokens.add(token) |
|
f.write(token + '\n') |
|
return tokens |
|
|
|
def confirm_api_token(token, token_file=TOKEN_FILE, expired_file=EXPIRED_FILE): |
|
'''Checks if the given token is valid and not expired.''' |
|
print(f'Checking token: {token} >>>>>>>>>>>>>>>>>>>') |
|
msg = 'Token is valid' |
|
with open(token_file, 'r') as f: |
|
tokens = set(f.read().splitlines()) |
|
if token in tokens: |
|
now = datetime.now() |
|
if token in load_expired_tokens(expired_file): |
|
if now < load_token_expiration(token, expired_file): |
|
return {'valid': True, 'message': msg} |
|
else: |
|
msg = 'Token has expired' |
|
return {'valid': False, 'message': msg} |
|
else: |
|
msg = 'Token is valid' |
|
expiry_date = now + timedelta(minutes=15) |
|
save_expired_token(token, expiry_date, expired_file) |
|
return {'valid': True, 'message': msg} |
|
msg = 'Token is invalid' |
|
return {'valid': False, 'message': msg} |
|
|
|
def load_expired_tokens(expired_file=EXPIRED_FILE): |
|
'''Loads expired tokens from the file.''' |
|
expired_tokens = {} |
|
if os.path.exists(expired_file): |
|
with open(expired_file, 'r') as f: |
|
for line in f: |
|
print(line) |
|
print(line.strip().split(',')) |
|
print('------------') |
|
token, expiry_date = line.strip().split(',') |
|
expired_tokens[token] = datetime.fromisoformat(expiry_date) |
|
return expired_tokens |
|
|
|
def load_token_expiration(token, expired_file=EXPIRED_FILE): |
|
'''Loads the expiration date for a given token.''' |
|
expired_tokens = load_expired_tokens(expired_file) |
|
return expired_tokens.get(token) |
|
|
|
def save_expired_token(token, expiry_date, expired_file=EXPIRED_FILE): |
|
'''Saves expired tokens to the file.''' |
|
if not os.path.exists(expired_file): |
|
with open(expired_file, 'w') as f: |
|
f.write(f"{token},{expiry_date.isoformat()}\n") |
|
else: |
|
with open(expired_file, 'a') as f: |
|
f.write(f"{token},{expiry_date.isoformat()}\n") |
|
|
|
|
|
def get_region_from_coordinates(latitude, longitude, max_retries=3): |
|
geolocator = Nominatim(user_agent="my_agent") |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
location = geolocator.reverse(f"{latitude}, {longitude}") |
|
if location and location.raw.get('address'): |
|
address = location.raw['address'] |
|
|
|
for level in ['state', 'county', 'region', 'province', 'district']: |
|
if level in address: |
|
return address[level] |
|
|
|
if 'country' in address: |
|
return address['country'] |
|
return "Region not found" |
|
except (GeocoderTimedOut, GeocoderUnavailable): |
|
if attempt == max_retries - 1: |
|
return "Geocoding service unavailable" |
|
|
|
return "Failed to retrieve region information" |
|
|
|
|
|
|
|
basemaps = { |
|
'Google Maps': folium.TileLayer( |
|
tiles = 'https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}', |
|
attr = 'Google', |
|
name = 'Google Maps', |
|
overlay = True, |
|
control = True |
|
), |
|
'Google Satellite': folium.TileLayer( |
|
tiles = 'https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}', |
|
attr = 'Google', |
|
name = 'Google Satellite', |
|
overlay = True, |
|
control = True |
|
), |
|
'Google Terrain': folium.TileLayer( |
|
tiles = 'https://mt1.google.com/vt/lyrs=p&x={x}&y={y}&z={z}', |
|
attr = 'Google', |
|
name = 'Google Terrain', |
|
overlay = True, |
|
control = True |
|
), |
|
'Google Satellite Hybrid': folium.TileLayer( |
|
tiles = 'https://mt1.google.com/vt/lyrs=y&x={x}&y={y}&z={z}', |
|
attr = 'Google', |
|
name = 'Google Satellite', |
|
overlay = True, |
|
control = True |
|
), |
|
'Esri Satellite': folium.TileLayer( |
|
tiles = 'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', |
|
attr = 'Esri', |
|
name = 'Esri Satellite', |
|
overlay = True, |
|
control = True |
|
), |
|
'openstreetmap': folium.TileLayer('openstreetmap'), |
|
'cartodbdark_matter': folium.TileLayer('cartodbdark_matter') |
|
} |
|
|
|
|
|
|
|
scripts_dir = './scripts/' |
|
scripts_files = [f for f in os.listdir(scripts_dir) if f.endswith('.js')] |
|
Scripts = {} |
|
for f in scripts_files: |
|
key = f.split('.')[0].upper() |
|
with open(scripts_dir + f) as f: |
|
Scripts[key] = f.read() |
|
|
|
def calculate_bbox(df, field): |
|
''' |
|
Calculate the bounding box of a specfic field ID in a given data frame |
|
''' |
|
bbox = df.loc[df['name'] == field].bounds |
|
r = bbox.iloc[0] |
|
return [r.minx, r.miny, r.maxx, r.maxy] |
|
|
|
def tiff_to_geodataframe(im, metric, date, crs): |
|
''' |
|
Convert a tiff image to a geodataframe |
|
''' |
|
x_cords = im.coords['x'].values |
|
y_cords = im.coords['y'].values |
|
vals = im.values |
|
dims = vals.shape |
|
points = [] |
|
v_s = [] |
|
for lat in range(dims[1]): |
|
y = y_cords[lat] |
|
for lon in range(dims[2]): |
|
x = x_cords[lon] |
|
v = vals[:,lat,lon] |
|
if isnan(v[0]): |
|
continue |
|
points.append(Point(x,y)) |
|
v_s.append(v.item()) |
|
d = {f'{metric}_{date}': v_s, 'geometry': points} |
|
df = gpd.GeoDataFrame(d, crs = crs) |
|
return df |
|
|
|
def get_bearer_token_headers(bearer_token): |
|
''' |
|
Get the bearer token headers to be used in the request to the SentinelHub API |
|
''' |
|
headers = { |
|
'Content-Type': 'application/json', |
|
'Authorization': 'Bearer '+ bearer_token, |
|
} |
|
return headers |
|
|
|
def get_downloaded_location_img_path(clientName, metric, date, field, extension='tiff'): |
|
''' |
|
Get the path of the downloaded image in TIFF based on the: |
|
''' |
|
date_dir = f'./data/{clientName}/raw/{metric}/{date}/field_{field}/' |
|
print(f'True Color Date Dir: {date_dir}') |
|
os.makedirs(date_dir, exist_ok=True) |
|
intermediate_dirs = os.listdir(date_dir) |
|
print(f'Intermediate Dirs: {intermediate_dirs}') |
|
if len(intermediate_dirs) == 0: |
|
return None |
|
imagePath = f'{date_dir}{os.listdir(date_dir)[0]}/response.{extension}' |
|
print(f'Image Path: {imagePath}') |
|
if not os.path.exists(imagePath): |
|
return None |
|
print(f'Image Path: {imagePath}') |
|
return imagePath |
|
|
|
def get_masked_location_img_path(clientName, metric, date, field): |
|
''' |
|
Get the path of the downloaded image after applying the mask in TIFF based on the: |
|
''' |
|
date_dir = f'./data/{clientName}/processed/{metric}/{date}/field_{field}/' |
|
imagePath = date_dir + 'masked.tiff' |
|
return imagePath |
|
|
|
def get_curated_location_img_path(clientName, metric, date, field): |
|
''' |
|
Get the path of the downloaded image after applying the mask and converting it to geojson formay based on the: |
|
''' |
|
date_dir = f'./data/{clientName}/curated/{metric}/{date}/field_{field}/' |
|
imagePath = date_dir + 'masked.geojson' |
|
|
|
if os.path.exists(imagePath): |
|
return imagePath |
|
else: |
|
return None |
|
|
|
def parse_app_config(path=r'config-fgm-dev.yaml'): |
|
config = confuse.Configuration('CropHealth', __name__) |
|
config.set_file(path) |
|
return config |
|
|
|
|
|
def fix_image(img): |
|
def normalize(band): |
|
band_min, band_max = (band.min(), band.max()) |
|
return ((band-band_min)/((band_max - band_min))) |
|
def brighten(band): |
|
alpha=3 |
|
beta=0 |
|
return np.clip(alpha*band+beta, 0,255) |
|
def gammacorr(band): |
|
gamma=0.9 |
|
return np.power(band, 1/gamma) |
|
red = img[:, :, 0] |
|
green = img[:, :, 1] |
|
blue = img[:, :, 2] |
|
red_b=brighten(red) |
|
blue_b=brighten(blue) |
|
green_b=brighten(green) |
|
red_bg=gammacorr(red_b) |
|
blue_bg=gammacorr(blue_b) |
|
green_bg=gammacorr(green_b) |
|
red_bgn = normalize(red_bg) |
|
green_bgn = normalize(green_bg) |
|
blue_bgn = normalize(blue_bg) |
|
rgb_composite_bgn= np.dstack((red_b, green_b, blue_b)) |
|
return rgb_composite_bgn |
|
|
|
|
|
def creat_gif(dataset, gif_name, duration=50): |
|
''' |
|
Create a gif from a list of images |
|
''' |
|
imgs = [Image.fromarray((255*img).astype(np.uint8)) for img in dataset] |
|
|
|
imgs[0].save(gif_name, save_all=True, append_images=imgs[1:], duration=duration, loop=1) |
|
|
|
|
|
def add_lat_lon_to_gdf_from_geometry(gdf): |
|
gdf['Lat'] = gdf['geometry'].apply(lambda p: p.x) |
|
gdf['Lon'] = gdf['geometry'].apply(lambda p: p.y) |
|
return gdf |
|
|
|
def gdf_column_to_one_band_array(gdf, column_name): |
|
gdf = gdf.sort_values(by=['Lat', 'Lon']) |
|
gdf = gdf.reset_index(drop=True) |
|
unique_lats_count = gdf['Lat'].nunique() |
|
unique_lons_count = gdf['Lon'].nunique() |
|
rows_arr = [[] for i in range(unique_lats_count)] |
|
column_values = gdf[column_name].values |
|
for i in tqdm(range(len(column_values))): |
|
row_index = i // unique_lons_count |
|
rows_arr[row_index].append(column_values[i]) |
|
|
|
max_row_length = max([len(row) for row in rows_arr]) |
|
for row in rows_arr: |
|
while len(row) < max_row_length: |
|
row.append(0) |
|
|
|
rows_arr = np.array(rows_arr) |
|
return rows_arr |