Spaces:
Runtime error
Runtime error
# Import libraries | |
import os | |
import streamlit as st | |
import pandas as pd | |
import platform, uuid, psutil | |
import requests | |
import json | |
from requests import get | |
from geopy.geocoders import Nominatim | |
from getmac import get_mac_address as gma | |
from pathlib import Path | |
from streamlit_extras.switch_page_button import switch_page | |
from geopy.distance import geodesic as GD | |
from datetime import datetime, timedelta | |
import matplotlib | |
# from face_verification_helper import face_verfication | |
# Security | |
#passlib,hashlib,bcrypt,scrypt | |
import hashlib | |
# DB Management | |
import sqlite3 | |
def make_hashes(password): | |
return hashlib.sha256(str.encode(password)).hexdigest() | |
def check_hashes(password, hashed_text): | |
if make_hashes(password) == hashed_text: | |
return hashed_text | |
return False | |
# DB Functions | |
# Create table store username and password | |
def create_user_table(): | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
c.execute('CREATE TABLE IF NOT EXISTS users(user_id INTEGER PRIMARY KEY AUTOINCREMENT,\ | |
username TEXT NOT NULL, password TEXT NOT NULL)') | |
c.close() | |
def add_user_data(username, password): | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
c.execute('INSERT INTO users(username, password) VALUES (?,?)',(username,password)) | |
conn.commit() | |
c.close() | |
def login_user(username, password): | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
c.execute('SELECT * FROM users WHERE username =? AND password = ?',(username,password)) | |
data = c.fetchall() | |
c.close() | |
return data | |
# def view_all_users(): | |
# Access database | |
# conn = sqlite3.connect('data.db') | |
# c = conn.cursor() | |
# c.execute('SELECT * FROM users') | |
# data = c.fetchall() | |
# c.close() | |
# return data | |
# Export data to CSV | |
def export_csv(): | |
# Access database | |
conn = sqlite3.connect('data.db') | |
# Export table login | |
db_df = pd.read_sql_query('SELECT * FROM login', conn) | |
db_df.to_csv('login.csv', index=False) | |
# Export table users | |
db_df = pd.read_sql_query('SELECT * FROM users', conn) | |
db_df.to_csv('users.csv', index=False) | |
# Create table to store login data | |
def create_login_table(): | |
# c.execute('DROP TABLE login') | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
c.execute('CREATE TABLE IF NOT EXISTS login(login_id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL,\ | |
login_time TEXT NOT NULL,\ | |
device_name TEXT, device_uuid TEXT, mac_address TEXT, device_vendor TEXT, device_model TEXT, device_ram TEXT,\ | |
ip_v4 TEXT, ip_country TEXT, ip_region TEXT, ip_city TEXT, ip_lat TEXT, ip_lon TEXT, isp_name TEXT, isp_org TEXT,\ | |
is_vpn TEXT, is_proxy TEXT, is_tor TEXT, is_relay TEXT,\ | |
lat TEXT, lon TEXT, suburb TEXT, district TEXT, city TEXT, country TEXT)') | |
c.close() | |
# Add login data to database | |
def add_login_data(user_dict): | |
# Create login table if not existed | |
create_login_table() | |
# Get data from user dictionary | |
username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country= get_from_user_dict(user_dict) | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
# Create table to store login information if not existed | |
c.execute('INSERT INTO login(username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country)\ | |
VALUES (?, ?,\ | |
?, ?, ?, ?, ?, ?,\ | |
?, ?, ?, ?, ?, ?, ?, ?,\ | |
?, ?, ?, ?,\ | |
?, ?, ?, ?, ?, ?)',\ | |
(username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country)) | |
conn.commit() | |
c.close() | |
# Get login data of the user | |
def get_login_data(username): | |
# Create login table if not existed | |
create_login_table() | |
# Access database | |
conn = sqlite3.connect('data.db') | |
c = conn.cursor() | |
c.execute('SELECT * FROM login WHERE username = ?',(username,)) | |
records = c.fetchall() | |
c.close() | |
return records | |
# Get data from user dictionary | |
def get_from_user_dict(user_dict): | |
username = user_dict.get('username', '') | |
login_time = user_dict.get('login_time', '') | |
# typing_speed = user_dict.get('typing_speed', '') | |
device_name = user_dict.get('device_name', '') | |
device_uuid = user_dict.get('device_uuid', '') | |
mac_address = user_dict.get('mac_address', '') | |
device_vendor = user_dict.get('device_vendor', '') | |
device_model = user_dict.get('device_model', '') | |
device_ram = user_dict.get('device_ram', '') | |
ip_v4 = user_dict.get('ip_v4', '') | |
ip_country = user_dict.get('ip_country', '') | |
ip_region = user_dict.get('ip_region', '') | |
ip_city = user_dict.get('ip_city', '') | |
ip_lat = user_dict.get('ip_lat', '') | |
ip_lon = user_dict.get('ip_lon', '') | |
isp_name = user_dict.get('isp_name', '') | |
isp_org = user_dict.get('isp_org', '') | |
is_vpn = user_dict.get('is_vpn', '') | |
is_proxy = user_dict.get('is_proxy', '') | |
is_tor = user_dict.get('is_tor', '') | |
is_relay = user_dict.get('is_relay', '') | |
lat = user_dict.get('lat', '') | |
lon = user_dict.get('lon', '') | |
suburb = user_dict.get('suburb', '') | |
district = user_dict.get('district', '') | |
city = user_dict.get('city', '') | |
country = user_dict.get('country', '') | |
return username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country | |
def get_from_api(url, value=""): | |
# Use get method to fetch details from URL API | |
response = get(url + value) | |
if response.status_code != 200: | |
raise Exception("[!] Invalid request!") | |
return response.content.decode() | |
def get_ip_info(ip_v4): | |
# Get information from the ipv4 | |
isp = get_from_api("http://ip-api.com/json/", ip_v4) | |
# Convert dictionary string to dictionary | |
isp = json.loads(isp) | |
# Get information from the dictionary | |
ip_country = isp["country"] | |
ip_region = isp["regionName"] | |
ip_city = isp["city"] | |
ip_lat = isp["lat"] | |
ip_lon = isp["lon"] | |
isp_name = isp["isp"] | |
isp_org = isp["org"] | |
# Detect VPN / proxy / tor | |
vpn_api_key = st.secrets["vpn_api_key"] | |
response = requests.get("https://vpnapi.io/api/" + ip_v4 + "?key=" + vpn_api_key) | |
data = json.loads(response.text) | |
is_vpn = data["security"]['vpn'] | |
is_proxy = data["security"]['proxy'] | |
is_tor = data["security"]['tor'] | |
is_relay = data["security"]['relay'] | |
return ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay | |
def get_location(lat, lon): | |
suburb = '' | |
district = '' | |
city = '' | |
country = '' | |
# Get address from given coordinate | |
geolocator = Nominatim(user_agent="BAAM") | |
location = geolocator.reverse(lat + "," + lon) | |
address = location.raw['address'] | |
suburb = address.get('suburb', '') | |
if address.get('city_district', ''): | |
district = address.get('city_district', '') | |
else: | |
district = address.get('district', '') | |
city = address.get('city', '') | |
country = address.get('country', '') | |
return location, suburb, district, city, country | |
# def collect_data(username, result, login_time, typing_speed): | |
def collect_data(username, result, login_time): | |
lat = '' | |
lon = '' | |
suburb = '' | |
district = '' | |
city = '' | |
country = '' | |
if "GET_LOCATION" in result: | |
lat = str(result.get("GET_LOCATION")["lat"]) | |
lon = str(result.get("GET_LOCATION")["lon"]) | |
if lat and lon: | |
location, suburb, district, city, country = get_location(lat, lon) | |
# Collect device information | |
device_name = platform.node() | |
device_uuid = uuid.getnode() | |
mac_address = gma() | |
device_vendor = get_from_api("https://api.macvendors.com/", mac_address) | |
device_model = platform.platform() | |
device_ram = str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB" | |
# Collect IP information | |
ip_v4 = get_from_api('https://api.ipify.org') | |
ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay = get_ip_info(ip_v4) | |
user_dict = { | |
"username": username, | |
"login_time": login_time, | |
# "typing_speed": typing_speed, | |
"device_name": device_name, | |
"device_uuid": device_uuid, | |
"mac_address": mac_address, | |
"device_vendor": device_vendor, | |
"device_model": device_model, | |
"device_ram": device_ram, | |
"ip_v4": ip_v4, | |
"ip_country": ip_country, | |
"ip_region": ip_region, | |
"ip_city": ip_city, | |
"ip_lat": ip_lat, | |
"ip_lon": ip_lon, | |
"isp_name": isp_name, | |
"isp_org": isp_org, | |
"is_vpn": is_vpn, | |
"is_proxy": is_proxy, | |
"is_tor": is_tor, | |
"is_relay": is_relay, | |
"lat": lat, | |
"lon": lon, | |
"suburb": suburb, | |
"district": district, | |
"city": city, | |
"country": country | |
} | |
return user_dict, str(location) | |
# Retrieve login history of the user | |
def get_login_history(username): | |
login_time_history = [] | |
# typing_speed_history = [] | |
device_name_history = [] | |
device_uuid_history = [] | |
mac_address_history = [] | |
device_vendor_history = [] | |
device_model_history = [] | |
device_ram_history = [] | |
ip_v4_history = [] | |
ip_country_history = [] | |
ip_region_history = [] | |
ip_city_history = [] | |
ip_lat_history = [] | |
ip_lon_history = [] | |
isp_name_history = [] | |
isp_org_history = [] | |
is_vpn_history = [] | |
is_proxy_history = [] | |
is_tor_history = [] | |
is_relay_history = [] | |
lat_history = [] | |
lon_history = [] | |
suburb_history = [] | |
district_history = [] | |
city_history = [] | |
country_history = [] | |
login_data = get_login_data(username) | |
if login_data: | |
for row in login_data: | |
login_time_history.append(row[2]) | |
# typing_speed_history.append(row[3]) | |
device_name_history.append(row[3]) | |
device_uuid_history.append(row[4]) | |
mac_address_history.append(row[5]) | |
device_vendor_history.append(row[6]) | |
device_model_history.append(row[7]) | |
device_ram_history.append(row[8]) | |
ip_v4_history.append(row[9]) | |
ip_country_history.append(row[10]) | |
ip_region_history.append(row[11]) | |
ip_city_history.append(row[12]) | |
ip_lat_history.append(row[13]) | |
ip_lon_history.append(row[14]) | |
isp_name_history.append(row[15]) | |
isp_org_history.append(row[16]) | |
is_vpn_history.append(row[17]) | |
is_proxy_history.append(row[18]) | |
is_tor_history.append(row[19]) | |
is_relay_history.append(row[20]) | |
lat_history.append(row[21]) | |
lon_history.append(row[22]) | |
suburb_history.append(row[23]) | |
district_history.append(row[24]) | |
city_history.append(row[25]) | |
country_history.append(row[26]) | |
return login_time_history,\ | |
device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\ | |
ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\ | |
is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\ | |
lat_history, lon_history, suburb_history, district_history, city_history, country_history | |
def submit_test_case(user_dict, location): | |
submit_button = st.button("Start test case") | |
# When clicking submit button | |
if submit_button: | |
# Call function to verify test case with historical data | |
verification = verify_user(user_dict) | |
# If fail the user verification logic | |
# if not(verification): | |
# # Check face verification | |
# verification = verify_face(user_dict.get('username', '')) | |
# Update location, user_dict to pass to other pages | |
st.session_state['location'] = location | |
st.session_state['user_dict'] = user_dict | |
st.session_state['verification'] = verification | |
# If passed all verification logic | |
if verification: | |
# Open Sent Page | |
switch_page("TestPass") | |
else: | |
# Open Failed Page | |
switch_page("InputImage") | |
def show_test_data(user_dict, location): | |
# Current information | |
username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) | |
# Show location | |
st.write('Location:', location) | |
col1, col2 = st.columns(2) | |
with col1: | |
# Show IP IP info | |
st.write('IP address:', ip_v4) | |
st.write('IP region:', ip_region) | |
st.write('IP city:', ip_city) | |
st.write('IP country:', ip_country) | |
st.write('Is VPN?', is_vpn) | |
st.write('Is Proxy?', is_proxy) | |
st.write('Is Tor Node?', is_tor) | |
st.write('Is Relay?', is_relay) | |
with col2: | |
# Show Device | |
st.write('ISP Name:', isp_name) | |
st.write('ISP Organisation:', isp_org) | |
st.write('Device Mac Address:', mac_address) | |
st.write('Device UUID:', device_uuid) | |
st.write('Device Name:', device_name) | |
st.write('Device Vendor:', device_vendor) | |
st.write('Device Model:', device_model) | |
st.write('Device Ram:', device_ram) | |
# Show Login time | |
st.write('Login time:', login_time) | |
def save_user_image(username, image, input_time): | |
save_dir = f"img/user_image/{username}" | |
save_file_path = f"img/user_image/{username}/{username}_" \ | |
f"{int(input_time)}.jpg" | |
if not os.path.exists(save_dir): | |
os.makedirs(save_dir) | |
with open(save_file_path, mode='wb') as w: | |
w.write(image.getbuffer()) | |
return save_file_path | |
def read_user_image(username): | |
list_image_path = [] | |
image_dir = f"img/user_image/{username}" | |
if not os.path.exists(image_dir): | |
os.makedirs(image_dir) | |
for x in os.listdir(image_dir): | |
if x.split(".")[1].lower() in ("jpg", "png", "jpeg"): | |
image_path = os.path.join(image_dir, x) | |
list_image_path.append(image_path) | |
if list_image_path: | |
return max(list_image_path, key=os.path.getctime) | |
else: | |
return None | |
# @Thao: Here is to put user historical data verification logic to determine if this is the real user | |
# THAO LE CODE | |
# 0.5 to get the more suitable per | |
# get all the values with count in range ~20% less than the highest | |
def get_right_per(user_dict,user_db,match_value,per): | |
if match_value == "": | |
return 0 | |
range_highest = 0.2 | |
count_all = user_db.groupby(match_value).count().login_time | |
up = count_all.max() | |
down = up*(1-range_highest) | |
# print(f'range is from {down} to {up}') | |
if count_all[user_dict[match_value]] >= down and count_all[user_dict[match_value]] <= up: | |
return 1 | |
else: | |
return per | |
# 1.1 to get the match value and percentage | |
def check_match_per(user_dict,user_db,check = 'location'): | |
""" | |
input the check is one of 'location','device','ip' | |
""" | |
match_value = '' | |
final_per = 0 | |
per = 0 | |
total_txn = len(user_db) | |
if check == 'location': | |
fields_check = ['country','city', 'district','suburb'] | |
elif check == 'device': | |
fields_check = ['device_vendor','device_model','device_name','mac_address','device_uuid'] | |
else: | |
fields_check = [ 'ip_country', 'isp_name','ip_v4'] | |
for i in fields_check: | |
# print(user_db) | |
# print(user_dict[i]) | |
count = len(user_db[user_db[i] == user_dict[i]]) | |
if count > 0: | |
# if user[i] in user_db[i].values and user[i] != '': | |
match_value = i | |
per = count/total_txn | |
elif i == 'mac_address' and match_value != i: | |
continue | |
else: | |
break | |
final_per = get_right_per(user_dict,user_db,match_value,per) | |
# print('match value ',{match_value}) | |
return match_value,final_per | |
# 2.1 Get velocity of all transactions | |
def get_vel_all(row): | |
# print(row) | |
dist = 0 | |
coor = (row['lat'],row['lon']) | |
coor_pre = (row['pre_lat'],row['pre_lon']) | |
interval = '' | |
vel = '' | |
if coor_pre != (0,0): | |
dist = GD(coor,coor_pre).km | |
interval = (row['login_time'] - row['pre_time']).total_seconds()/(60*60) | |
# interval = (row['login_time'] - row['pre_time']).days | |
if interval != 0: | |
vel = dist/interval | |
else: | |
vel = 0 | |
return vel | |
# 2.2 get vel of the latest txn | |
def get_vel_txn(user_dict,user_db): | |
""" | |
get the velocity of the new transaction in user_dict and the latest transaction in user's history | |
""" | |
dist = 0 | |
interval = '' | |
latest_txn = user_db.iloc[[-1]] | |
# print(latest_txn) | |
coor_txn = (user_dict['lat'],user_dict['lon']) | |
coor_latest = (float(latest_txn['lat']),float(latest_txn['lon'])) | |
dist = GD(coor_txn,coor_latest).km | |
print(type(user_dict['login_time'])) | |
try: | |
time_txn = user_dict['login_time'] | |
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) | |
except: | |
time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S') | |
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) | |
# if type(user_dict['login_time']) == 'str': | |
# time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S') | |
# else: | |
# time_txn = user_dict['login_time'] | |
# print(latest_txn['login_time']) | |
# interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) | |
try: | |
vel = dist/interval | |
except: | |
vel = 0 | |
print(f'This is vel {vel}') | |
return(vel) | |
# 2 to get score will be reduced because of jumping | |
def get_score_jump (user_dict,user_db): | |
threshold_vel = { | |
'H':600, | |
'M':80, | |
'L':40, | |
'frequency':0.1 | |
} | |
weight_vel = { | |
'H':20, | |
'M':10, | |
"L":5 | |
} | |
many_vel = { | |
'Y':0.1, | |
'N':1 | |
} | |
# to get how many jumping - low or not | |
count_jump = len(user_db[user_db.apply(lambda x: float(x.vel) > threshold_vel['L'] if x.vel != "" else False,axis=1)]) | |
if count_jump > len(user_db)*threshold_vel['frequency']: | |
many_jump = 'Y' | |
else: | |
many_jump = 'N' | |
vel_txn = float(get_vel_txn(user_dict,user_db)) | |
if vel_txn > threshold_vel['H']: | |
score_jump = many_vel[many_jump] * weight_vel['H'] | |
elif vel_txn > threshold_vel['M']: | |
score_jump = many_vel[many_jump] * weight_vel['M'] | |
elif vel_txn > threshold_vel['L']: | |
score_jump = many_vel[many_jump] * weight_vel['L'] | |
else: | |
score_jump = 0 | |
return(score_jump) | |
# 3.1 check vpn (new IP + vpn) | |
def get_vpn_score (user_dict,user_db): | |
vpn_fields = ['is_vpn','is_proxy', 'is_tor','is_relay'] | |
weight_vpn = 10 | |
vpn_count = 0 | |
for i in vpn_fields: | |
vpn_count += user_dict[i] | |
if check_match_per(user_dict,user_db,check = 'ip')[0] != 'ip_v4' and vpn_count > 0: | |
return weight_vpn | |
else: | |
return 0 | |
# 4.Get score | |
def get_risk_score (user_dict,user_db): | |
weight = {'device_uuid': 40, 'mac_address': 40, 'device_name': 30.0, 'device_model': 20.0, \ | |
'device_vendor': 4.0, 'ip_v4': 30, 'isp_name': 15.0, 'ip_country': 3.0, 'suburb': 30, 'district': 22.5, \ | |
'city': 15.0, 'country': 3.0} | |
device_match,device_per = check_match_per(user_dict,user_db,check='device') | |
if device_match != '': | |
device_score = weight[device_match] * device_per | |
else: | |
device_score = 0 | |
ip_match,ip_per = check_match_per(user_dict,user_db,check='ip') | |
if ip_match != '': | |
ip_score = weight[ip_match] * ip_per | |
else: | |
ip_score = 0 | |
# check location | |
location_match,location_per = check_match_per(user_dict,user_db,check='location') | |
if location_match != '': | |
location_score = weight[location_match] * location_per | |
else: | |
location_score = 0 | |
# print(f'match location {location_match} with score {location_score}') | |
jump_score = get_score_jump (user_dict,user_db) | |
vpn_score = get_vpn_score (user_dict,user_db) | |
print(f'device score {device_score}') | |
print(f'ip_score {ip_score}') | |
print(f'location_score {location_score}') | |
print(f'jump_score {jump_score}') | |
print(f'vpn_score {vpn_score}') | |
# return device_score+ip_score+location_score-(jump_score + vpn_score) | |
total_score = device_score+ip_score+location_score-(jump_score + vpn_score) | |
score_dict = { | |
"device_score": device_score, | |
"ip_score": ip_score, | |
"location_score": location_score, | |
"jump_score": jump_score, | |
"vpn_score": vpn_score, | |
"total_score": total_score | |
} | |
st.session_state['score_dict'] = score_dict | |
return total_score | |
# User verification | |
def verify_user(user_dict): | |
verification = False | |
# Current information | |
# user_dict is dictionary | |
username, login_time,\ | |
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ | |
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ | |
is_vpn, is_proxy, is_tor, is_relay,\ | |
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) | |
# Retrieve login history of the user - a tuple | |
# login_time_history, \ | |
# device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\ | |
# ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\ | |
# is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\ | |
# lat_history, lon_history, suburb_history, district_history, city_history, country_history = get_login_history(username) | |
#Thao Le note: | |
col = ['login_time','device_name', 'device_uuid','mac_address', 'device_vendor', 'device_model', 'device_ram',\ | |
'ip_v4','ip_country', 'ip_region', 'ip_city', 'ip_lat', 'ip_lon', 'isp_name','isp_org',\ | |
'is_vpn', 'is_proxy', 'is_tor', 'is_relay', \ | |
'lat', 'lon','suburb', 'district', 'city', 'country'] | |
df = get_login_history(username) | |
user_db = pd.DataFrame(get_login_history(username)).T | |
user_db.columns= col | |
# print(f'this is from BAAM function, username is {username}') | |
# print('this is user_dict',user_dict) | |
if len(user_db) == 0: | |
print('This is the 1st login time of this username') | |
verification = True | |
score_dict = {} | |
st.session_state['score_dict'] = score_dict | |
return verification | |
# print(user_db) | |
# 2. Def to check score_jumping: velocity (H: 600+, M: 80 - 600, S: 40<80) and frequency_jumping (rare or ussually) | |
user_db.login_time = pd.to_datetime(user_db.login_time) | |
user_db['pre_lat'] = user_db['lat'].shift(periods=1, fill_value=0) | |
user_db['pre_lon'] = user_db['lon'].shift(periods=1, fill_value=0) | |
user_db['pre_time'] = user_db['login_time'].shift(periods=1,fill_value=0) | |
user_db['vel'] = user_db.apply(lambda x: get_vel_all(x),axis=1) | |
trust_score =get_risk_score (user_dict,user_db) | |
risk_threshold = 30 | |
st.session_state['risk_threshold'] = risk_threshold | |
print(f'trust_score is {trust_score}') | |
if trust_score < risk_threshold: | |
verification = False | |
else: | |
verification = True | |
print(f'verification {verification}') | |
# verification = True # This should be removed after @Thao adds the function user historical data verification | |
return verification | |
# @Dora: This is to put the function face verification | |
# Face verification | |
def verify_face(username, img_file_buffer): | |
face_verification = True | |
# input_time = datetime.now().timestamp() | |
# latest_history_image = read_user_image(username) | |
# if latest_history_image: | |
# image_path = save_user_image(username, img_file_buffer, input_time) | |
# face_verification = face_verfication([[latest_history_image,image_path]]) | |
# os.remove(image_path) | |
# else: | |
# face_verification = True | |
# if face_verification: | |
# image_path = save_user_image(username, img_file_buffer, input_time) | |
# print(f"verify face:{face_verification}") | |
return face_verification |