File size: 2,524 Bytes
6a0f6c2
640dc7a
 
84d8075
640dc7a
 
 
 
 
 
 
 
 
84d8075
 
640dc7a
 
35153c2
640dc7a
 
 
 
 
 
35153c2
640dc7a
35153c2
 
 
 
29ff8b7
6a0f6c2
 
 
 
29ff8b7
 
 
 
 
35153c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a0f6c2
35153c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json
import os

import duckdb

# Configure DuckDB connection
if not os.getenv("motherduck_token"):
    raise Exception(
        "No motherduck token found. Please set the `motherduck_token` environment variable."
    )
else:
    con = duckdb.connect("md:climatebase")
    con.sql("USE climatebase;")
    # load extensions
    con.sql("""INSTALL spatial; LOAD spatial;""")


# to-do: pass con through decorator
def list_projects_by_author(author_id):
    return con.execute(
        "SELECT DISTINCT name FROM project WHERE authorId = ? AND geometry != 'null'",
        [author_id],
    ).df()


def get_project_geometry(project_name):
    return con.execute(
        "SELECT geometry FROM project WHERE name = ? LIMIT 1", [project_name]
    ).fetchall()


def get_project_centroid(project_name):
    # Workaround to get centroid of project
    # To-do: refactor to only use DuckDB spatial extension
    _geom = get_project_geometry(project_name)
    _polygon = json.dumps(json.loads(_geom[0][0])["features"][0]["geometry"])
    return con.sql(
        f"SELECT ST_X(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS longitude, ST_Y(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS latitude;"
    ).fetchall()[0]


def get_project_scores(project_name, start_year, end_year):
    return con.execute(
        "SELECT * FROM bioindicator WHERE (year >= ? AND year <= ? AND project_name = ?)",
        [start_year, end_year, project_name],
    ).df()


def check_if_project_exists_for_year(project_name, year):
    return con.execute(
        "SELECT COUNT(1) FROM bioindicator WHERE (year = ? AND project_name = ?)",
        [year, project_name],
    ).fetchall()[0][0]


def write_score_to_temptable(df):
    con.sql(
        "CREATE OR REPLACE TABLE _temptable AS SELECT *, (value * area) AS score FROM (SELECT year, project_name, AVG(value) AS value, area  FROM df GROUP BY year, project_name, area ORDER BY project_name)"
    )
    return True


def get_or_create_bioindicator_table():
    con.sql(
        """
            USE climatebase;
            CREATE TABLE IF NOT EXISTS bioindicator (year BIGINT, project_name VARCHAR(255), value DOUBLE, area DOUBLE, score DOUBLE, CONSTRAINT unique_year_project_name UNIQUE (year, project_name));
            """
    )
    return True


def upsert_project_record():
    con.sql(
        """
                INSERT INTO bioindicator FROM _temptable
                ON CONFLICT (year, project_name) DO UPDATE SET value = excluded.value;
            """
    )
    return True