-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
abe6076
commit ff21d52
Showing
4 changed files
with
369 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import math | ||
import os | ||
import typing | ||
|
||
import pandas as pd | ||
import requests | ||
from dotenv import load_dotenv | ||
|
||
load_dotenv() | ||
|
||
# Define the Directus instance, mail and password from .env | ||
directus_instance = os.getenv("DIRECTUS_INSTANCE") | ||
directus_login = f"{directus_instance}/auth/login" | ||
|
||
# Define the collection name and API url | ||
collection_name = "Field_Data" | ||
directus_api = f"{directus_instance}/items/{collection_name}" | ||
directus_email = os.getenv("DIRECTUS_EMAIL") | ||
directus_password = os.getenv("DIRECTUS_PASSWORD") | ||
|
||
# Create a session object for making requests | ||
session = requests.Session() | ||
|
||
# Send a POST request to the login endpoint | ||
response = session.post(directus_login, json={"email": directus_email, "password": directus_password}) | ||
|
||
# Test if connection is successful | ||
if response.status_code == 200: | ||
# Stores the access token | ||
data = response.json()["data"] | ||
directus_token = data["access_token"] | ||
|
||
# Construct headers with authentication token | ||
headers = { | ||
"Authorization": f"Bearer {directus_token}", | ||
"Content-Type": "application/json", | ||
} | ||
|
||
out_csv_path = str(os.getenv("OUT_CSV_PATH")) | ||
|
||
# Iterate over all CSV files in the input folder and its subdirectories | ||
for root, _dirs, files in os.walk(out_csv_path): | ||
for filename in files: | ||
# Retrieve project name | ||
project = root.split("/")[-1] | ||
|
||
# Ignore old layer without sample_id | ||
if filename.endswith(".csv") and filename != "SBL_20004_2022_EPSG:4326.csv": | ||
# Read each df | ||
constructed_path = root + "/" + filename | ||
df = pd.read_csv(constructed_path) | ||
|
||
# Add qfield project to dataframe | ||
df["qfield_project"] = project | ||
|
||
# Create an empty dictionary to store the fields to create | ||
observation: dict[str, typing.Any] = {} | ||
|
||
# Format each observation for directus | ||
for col_name in df.columns: | ||
# Replace dots with underscores in field names | ||
new_col_name = col_name.replace(".", "_") | ||
# Add to the dictionary | ||
observation[new_col_name] = None # Initialize with None | ||
|
||
# Iterate over each row in the DataFrame | ||
for i in range(len(df)): | ||
# Convert each row to a dictionary | ||
obs = df.iloc[i].to_dict() | ||
|
||
# Convert problematic float values | ||
for key, value in obs.items(): | ||
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): | ||
obs[key] = None if math.isnan(value) else float(value) | ||
|
||
# Update the observation dictionary with values from the current row | ||
for col_name, value in obs.items(): | ||
observation[col_name.replace(".", "_")] = value | ||
|
||
# Send the POST request to create or update the fields | ||
response = session.post(url=directus_api, headers=headers, json=observation) | ||
# Check if the request was successful | ||
if response.status_code == 400: | ||
sample_code = obs["sample_id"] | ||
response_get = session.get(f"{directus_api}?filter[sample_id][_eq]={sample_code}&&limit=1") | ||
if str(response_get.json()) != "{'data': []}": | ||
data = response_get.json()["data"][0] | ||
id_sample = data["id"] | ||
directus_observation = f"{directus_api}/{id_sample}" | ||
response2 = session.patch(url=directus_observation, headers=headers, json=observation) | ||
if response2.status_code != 200: | ||
print(f"Error: {response2.status_code} - {response2.text}") | ||
else: | ||
print(str(obs["sample_id"]) + " contains non unique fields.") | ||
elif response.status_code != 400 and response.status_code != 200: | ||
print(f"Error: {response.status_code} - {response.text}") | ||
print(obs["sample_id"]) | ||
print(filename) | ||
print(obs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
import os | ||
|
||
import pandas as pd | ||
import requests | ||
from dotenv import load_dotenv | ||
|
||
load_dotenv() | ||
|
||
# Define the Directus instance, mail and password from .env | ||
directus_instance = os.getenv("DIRECTUS_INSTANCE") | ||
directus_login = f"{directus_instance}/auth/login" | ||
|
||
# Define the collection name and API url | ||
collection_name = "Field_Data" | ||
directus_api = f"{directus_instance}/items/{collection_name}" | ||
directus_email = os.getenv("DIRECTUS_EMAIL") | ||
directus_password = os.getenv("DIRECTUS_PASSWORD") | ||
|
||
# Create a session object for making requests | ||
session = requests.Session() | ||
|
||
# Send a POST request to the login endpoint | ||
response = session.post(directus_login, json={"email": directus_email, "password": directus_password}) | ||
|
||
|
||
# Function to get parent sample containers primary keys | ||
def get_primary_key_field(sample_code: str) -> int: | ||
params = { | ||
"filter[sample_id][_eq]": sample_code, | ||
"fields": "id", | ||
} | ||
# Create a session object for making requests | ||
session = requests.Session() | ||
response = session.get("https://emi-collection.unifr.ch/directus/items/Field_Data/", params=params) | ||
if response.status_code == 200: | ||
data = response.json() | ||
if data["data"]: | ||
return int(data["data"][0]["id"]) | ||
else: | ||
return -1 | ||
else: | ||
return -1 | ||
|
||
|
||
# Function to get parent sample containers primary keys | ||
def get_primary_key_container(sample_code: str) -> int: | ||
params = {"filter[container_id][_eq]": sample_code, "fields": "id"} | ||
# Create a session object for making requests | ||
session = requests.Session() | ||
response = session.get("https://emi-collection.unifr.ch/directus/items/Containers/", params=params) | ||
if response.status_code == 200: | ||
data = response.json() | ||
if data["data"]: | ||
return int(data["data"][0]["id"]) | ||
else: | ||
return -1 | ||
else: | ||
return -1 | ||
|
||
|
||
# Function to get parent sample containers primary keys | ||
def get_primary_key_dried(sample_code: int) -> int: | ||
params = {"filter[sample_container][_eq]": str(sample_code), "fields": "id"} | ||
# Create a session object for making requests | ||
session = requests.Session() | ||
response = session.get("https://emi-collection.unifr.ch/directus/items/Dried_Samples_Data/", params=params) | ||
if response.status_code == 200: | ||
data = response.json() | ||
if data["data"]: | ||
return int(data["data"][0]["id"]) | ||
else: | ||
return -1 | ||
else: | ||
return -1 | ||
|
||
|
||
# Function to get parent sample containers primary keys | ||
def get_primary_key_ext(sample_code: int) -> int: | ||
print(sample_code) | ||
params = {"filter[parent_sample_container][_eq]": str(sample_code), "fields": "id"} | ||
# Create a session object for making requests | ||
session = requests.Session() | ||
response = session.get("https://emi-collection.unifr.ch/directus/items/Extraction_Data/", params=params) | ||
if response.status_code == 200: | ||
data = response.json() | ||
if data["data"]: | ||
return int(data["data"][0]["id"]) | ||
else: | ||
return -1 | ||
else: | ||
return -1 | ||
|
||
|
||
# Test if connection is successful | ||
if response.status_code == 200: | ||
# Stores the access token | ||
data = response.json()["data"] | ||
directus_token = data["access_token"] | ||
|
||
# Construct headers with authentication token | ||
headers = { | ||
"Authorization": f"Bearer {directus_token}", | ||
"Content-Type": "application/json", | ||
} | ||
response_get = session.get(f"{directus_api}?limit=-1") | ||
data = response_get.json()["data"] | ||
df = pd.DataFrame(data) | ||
for _index, row in df.iterrows(): | ||
sample_id = row["sample_id"] | ||
id_container = get_primary_key_container(sample_id) | ||
id_field = get_primary_key_field(sample_id) | ||
id_dried = get_primary_key_dried(int(id_container)) | ||
id_ext = get_primary_key_ext(int(id_container)) | ||
directus_observation_dried = f"https://emi-collection.unifr.ch/directus/items/Dried_Samples_Data/{id_dried}" | ||
response_patch = session.patch(url=directus_observation_dried, headers=headers, json={"field_data": id_field}) | ||
print( | ||
f"sample id: {sample_id}, id container: {id_container}, id field: {id_field}, id dried: {id_dried}, id ext: {id_ext}" | ||
) | ||
if response_patch.status_code != 200: | ||
print( | ||
f"sample id: {sample_id}, id: {id}, error: {response_patch.status_code}, message: {response_patch.text}" | ||
) | ||
directus_observation_ext = f"https://emi-collection.unifr.ch/directus/items/Extraction_Data/{id_ext}" | ||
response_patch_ext = session.patch(url=directus_observation_ext, headers=headers, json={"field_data": id_field}) | ||
if response_patch.status_code != 200: | ||
print( | ||
f"sample id: {sample_id}, id: {id}, error: {response_patch_ext.status_code}, message: {response_patch_ext.text}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import os | ||
|
||
import pandas as pd | ||
import requests | ||
from dotenv import load_dotenv | ||
|
||
# Loads .env variables | ||
load_dotenv() | ||
|
||
|
||
# Define the Directus instance, mail and password from .env | ||
directus_instance = os.getenv("DIRECTUS_INSTANCE") | ||
directus_login = f"{directus_instance}/auth/login" | ||
|
||
# Define the collection name and API url | ||
collection_name = "Field_Data" | ||
directus_api = f"{directus_instance}/items/{collection_name}/" | ||
directus_email = os.getenv("DIRECTUS_EMAIL") | ||
directus_password = os.getenv("DIRECTUS_PASSWORD") | ||
|
||
# Create a session object for making requests | ||
session = requests.Session() | ||
|
||
# Send a POST request to the login endpoint | ||
response = session.post(directus_login, json={"email": directus_email, "password": directus_password}) | ||
|
||
# Test if connection is successful | ||
if response.status_code == 200: | ||
# Stores the access token | ||
data = response.json()["data"] | ||
directus_token = data["access_token"] | ||
|
||
# Construct headers with authentication token | ||
headers = { | ||
"Authorization": f"Bearer {directus_token}", | ||
"Content-Type": "application/json", | ||
} | ||
|
||
out_csv_path = str(os.getenv("OUT_CSV_PATH")) | ||
|
||
# Iterate over all CSV files in the input folder and its subdirectories | ||
for root, _dirs, files in os.walk(out_csv_path): | ||
for filename in files: | ||
# Retrieve project name | ||
project = root.split("/")[-1] | ||
# Ignore old layer without sample_id | ||
if filename.endswith(".csv") and filename != "SBL_20004_2022_EPSG:4326.csv": | ||
# Read each df | ||
constructed_path = root + "/" + filename | ||
df = pd.read_csv(constructed_path) | ||
|
||
# Add qfield project to dataframe | ||
df["qfield_project"] = project | ||
|
||
# Define the threshold for text length | ||
threshold = 255 | ||
|
||
# Create an empty dictionary to store the biggest values of each column | ||
longest_content = {} | ||
|
||
# Create an empty dictionary to store the fields to create | ||
observation = {} | ||
|
||
# Loop over the columns to create the dict | ||
for col_name in df.columns: | ||
# Replace dots with underscores in field names | ||
new_col_name = col_name.replace(".", "_") | ||
# Add to the dictionary | ||
observation[new_col_name] = col_name | ||
|
||
# Find the longest content in the column | ||
longest = df[col_name].astype(str).apply(len).max() | ||
|
||
# Store the longest content for the column | ||
if str(longest) != "nan": | ||
longest_content[new_col_name] = longest | ||
else: | ||
longest_content[new_col_name] = 1 | ||
|
||
# Request directus to create the columns | ||
for i in observation: | ||
col_init = str.replace(str(observation[i]), "['", "") | ||
col = str.replace(col_init, "']", "") | ||
col_clean = str.replace(col, ".", "_") | ||
df_type = str(df[col].dtype) | ||
df_col_name = str(df[col].name) | ||
|
||
# Replace types to match directus ones | ||
|
||
if df_type == "object" and longest_content[i] < threshold: | ||
dir_type = "string" | ||
elif df_type == "int64" and longest_content[i] < threshold: | ||
dir_type = "integer" | ||
elif df_type == "bool" and longest_content[i] < threshold: | ||
dir_type = "boolean" | ||
elif df_type == "float64" and longest_content[i] < threshold: | ||
dir_type = "float" | ||
elif longest_content[i] >= threshold: | ||
dir_type = "text" | ||
else: | ||
# If type is not handled by the ones already made, print it so we can integrate it easily | ||
print(f"not handled type: {df_type}, longest content: {longest_content[i]}") | ||
if df_col_name == "geojson.coordinates": | ||
dir_type = "geometry.Point" | ||
|
||
# Create patch url | ||
url_patch = f"{directus_instance}/fields/{collection_name}/{col_clean}" | ||
|
||
# Construct directus url | ||
url = f"{directus_instance}/fields/{collection_name}" | ||
# Create a field for each csv column | ||
data = {"field": col_clean, "type": dir_type} | ||
|
||
# Make directus request | ||
response = requests.post(url, json=data, headers=headers, timeout=10) | ||
# Check if adding is success | ||
if response.status_code == 200: | ||
# print(f"{col_clean} field created") | ||
# If field is of type geometry.Point, add a validation to correctly display map | ||
if dir_type == "geometry.Point": | ||
validation = {"meta": {"validation": {"_and": [{col_clean: {"_intersects_bbox": None}}]}}} | ||
response = requests.patch(url_patch, json=validation, headers=headers, timeout=10) | ||
if response.status_code != 200: | ||
# print(f"validation correctly added for field {col_clean}") | ||
# else: | ||
print("error adding validation") | ||
# else print the type and the column name | ||
elif response.status_code == 400: | ||
response = requests.patch(url_patch, json=data, headers=headers, timeout=10) | ||
if response.status_code != 200: | ||
# print(f"field {col_clean} updated") | ||
# print(dir_type) | ||
# else: | ||
print(f"error creating/updating field {col_clean}") | ||
else: | ||
print(response.status_code) | ||
print(response.text) | ||
print(dir_type) | ||
print(col_clean) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters