Skip to content
Snippets Groups Projects
Commit 35d26986 authored by Pablo Boizeau's avatar Pablo Boizeau
Browse files

Improve CI, linter and formatting

- Add mypy to CI
- Add cache to CI
- Fix python files formatting
- Add `py.typed` file (make PEP561 compliant)

See merge request !10
parent 2de16b9c
No related branches found
No related tags found
1 merge request!10Improve CI, linter and formatting
Pipeline #272934 passed
default:
image: python:3.10-slim
image: python:3.12-slim
workflow:
rules:
- if: $CI_MERGE_REQUEST_ID
- if: $CI_COMMIT_BRANCH == 'main'
cache:
paths:
- .cache/pip
- .venv
before_script:
- pip install virtualenv
- virtualenv .venv
- source .venv/bin/activate
- pip install -e .
stages:
- Static Analysis
- Install
- Static Analysis
- Test
- Ship
- Documentation
# ------------------------------- Install -------------------------------------
pip_install:
stage: Install
before_script:
- python --version ; pip --version
script:
- pip install .
- pip list -v
# ------------------------------ Static analysis ------------------------------
.static_analysis_base:
stage: Static Analysis
allow_failure: true
except:
- main
flake8:
needs:
- pip_install
extends: .static_analysis_base
script:
- pip install flake8
- flake8 $PWD/theia_dumper
- flake8 --ignore E501 ./theia_dumper
pylint:
needs:
- pip_install
extends: .static_analysis_base
script:
- pip install pylint appdirs requests pystac pystac-client pydantic urllib3
- pylint $PWD/theia_dumper --disable=W0718,W0603,R0914,C0415
- pip install pylint
- pylint ./theia_dumper
codespell:
needs:
- pip_install
extends: .static_analysis_base
script:
- pip install codespell
- codespell theia_dumper docs README.md
- codespell ./theia_dumper docs README.md
pydocstyle:
needs:
- pip_install
extends: .static_analysis_base
script:
- pip install pydocstyle
- pydocstyle $PWD/theia_dumper
# ------------------------------- Install -------------------------------------
- pydocstyle ./theia_dumper
pip_install:
stage: Install
except:
- main
mypy:
needs:
- pip_install
extends: .static_analysis_base
script:
- pip install .
- pip install mypy
- pip list -v
- mypy --install-types --non-interactive .
# --------------------------------- Doc ---------------------------------------
.doc_base:
stage: Documentation
before_script:
- pip install -e .
- pip install -r doc/doc_requirements.txt
artifacts:
paths:
- public
......@@ -71,6 +96,7 @@ test:
except:
- main
script:
- pip install -r doc/doc_requirements.txt
- mkdocs build --site-dir public_test
pages:
......@@ -78,6 +104,7 @@ pages:
only:
- main
script:
- pip install -r doc/doc_requirements.txt
- mkdocs build --site-dir public
# --------------------------------- Test --------------------------------------
......@@ -86,9 +113,6 @@ pages:
stage: Test
except:
- main
before_script:
- pip install .
Tests:
extends: .tests_base
......
......@@ -22,4 +22,3 @@ for path in sorted(Path("theia_dumper").rglob("*.py")): #
print("::: " + identifier, file=fd) #
mkdocs_gen_files.set_edit_path(full_doc_path, path)
[build-system]
requires = ["setuptools >= 61.0", "wheel"]
requires = ["setuptools >= 61.2", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "theia_dumper"
version = "0.0.3"
description = "THEIA-MTP geospatial data publisher"
authors = [
{ name = "Rémi Cresson", email = "remi.cresson@inrae.fr" },
]
authors = [{ name = "Rémi Cresson", email = "remi.cresson@inrae.fr" }]
requires-python = ">=3.9"
dependencies = [
"setuptools",
"pystac",
"pystac_client",
"dinamis_sdk==0.3.3",
"dinamis_sdk==0.3.4",
"requests",
"rio-cogeo"
"rio-cogeo",
]
license = { text = "Apache-2.0" }
classifiers = [
......@@ -31,12 +30,19 @@ classifiers = [
[project.scripts]
theia_dumper = "theia_dumper.cli:theia_dumper"
[tool.setuptools]
packages = ["theia_dumper"]
[tool.setuptools.dynamic]
version = { attr = "theia_dumper.__version__" }
[tool.pydocstyle]
convention = "google"
[tool.mypy]
show_error_codes = true
pretty = true
exclude = ["doc", "venv", ".venv"]
[tool.pylint]
disable = "W1203,R0903,E0401,W0622,C0116,C0115"
[tool.setuptools]
packages = ["theia_dumper"]
tests/all.py 100644 → 100755
import pystac
from datetime import datetime
import requests
import tempfile
#!/usr/bin/env python
import os
import shutil
import tempfile
from datetime import datetime
import pystac
import requests
from theia_dumper import stac
......@@ -11,7 +14,7 @@ handler = stac.TransactionsHandler(
stac_endpoint="https://stacapi-cdos.apps.okd.crocc.meso.umontpellier.fr",
storage_endpoint="https://s3-data.meso.umontpellier.fr",
storage_bucket="sm1-gdc-tests",
assets_overwrite=True
assets_overwrite=True,
)
image_href = (
......@@ -20,15 +23,12 @@ image_href = (
)
col_id = "collection-for-theia-dumper-tests"
items_ids = [
"item_1",
"item_2"
]
items_ids = ["item_1", "item_2"]
raster_file1 = "/tmp/raster1.tif"
raster_file2 = "/tmp/raster2.tif"
with open(raster_file1, 'wb') as f:
r = requests.get(image_href)
with open(raster_file1, "wb") as f:
r = requests.get(image_href, timeout=5)
f.write(r.content)
shutil.copyfile(raster_file1, raster_file2)
......@@ -47,20 +47,22 @@ def create_item(item_id: str):
geometry={
"type": "Polygon",
"coordinates": [
[[4.032730583418401, 43.547450099338604],
[4.036414917971517, 43.75162726634343],
[3.698685718905037, 43.75431706444037],
[3.6962018175925073, 43.55012996681564],
[4.032730583418401, 43.547450099338604]]
]
[
[4.032730583418401, 43.547450099338604],
[4.036414917971517, 43.75162726634343],
[3.698685718905037, 43.75431706444037],
[3.6962018175925073, 43.55012996681564],
[4.032730583418401, 43.547450099338604],
]
],
},
bbox=[0.954895, 43.562481, 4.443054, 44.791582],
datetime=datetime.now().replace(year=1999),
properties={},
assets={
"ndvi": pystac.Asset(href=raster_file1),
"crswir": pystac.Asset(href=raster_file2)
}
"crswir": pystac.Asset(href=raster_file2),
},
)
return item
......@@ -69,9 +71,7 @@ def create_item(item_id: str):
def create_collection():
"""Create an empty STAC collection."""
spat_extent = pystac.SpatialExtent([[0, 0, 2, 3]])
temp_extent = pystac.TemporalExtent(
intervals=[(None, None)]
)
temp_extent = pystac.TemporalExtent(intervals=[(None, None)])
col = pystac.Collection(
id=col_id,
extent=pystac.Extent(spat_extent, temp_extent),
......@@ -79,7 +79,7 @@ def create_collection():
href="http://hello.fr/collections/collection-for-tests",
providers=[
pystac.Provider("INRAE"),
]
],
)
return col
......@@ -87,10 +87,7 @@ def create_collection():
def create_items_and_collection(relative):
"""Create two STAC items attached to one collection."""
# Create items
items = [
create_item(item_id=item_id)
for item_id in items_ids
]
items = [create_item(item_id=item_id) for item_id in items_ids]
# Attach items to collection
col = create_collection()
......@@ -110,7 +107,8 @@ def generate_collection(root_dir, relative=True):
col.normalize_hrefs(root_dir)
col.save(
catalog_type=pystac.CatalogType.RELATIVE_PUBLISHED
if relative else pystac.CatalogType.ABSOLUTE_PUBLISHED
if relative
else pystac.CatalogType.ABSOLUTE_PUBLISHED
)
......
"""Theia dumper package."""
"""Theia-dumper Command Line Interface."""
import click
from .stac import TransactionsHandler, delete_stac_obj
......@@ -8,84 +9,67 @@ DEFAULT_STAC_EP = "https://stacapi-cdos.apps.okd.crocc.meso.umontpellier.fr"
@click.group()
def theia_dumper() -> None:
pass
"""Theia dumper CLI tool."""
@theia_dumper.command(context_settings={'show_default': True})
@theia_dumper.command(context_settings={"show_default": True})
@click.argument("stac_obj_path")
@click.option(
'--stac_endpoint',
"--stac_endpoint",
help="Endpoint to which STAC objects will be sent",
type=str,
default=DEFAULT_STAC_EP
default=DEFAULT_STAC_EP,
)
@click.option(
'--storage_endpoint',
"--storage_endpoint",
type=str,
help="Storage endpoint assets will be sent to",
default="https://s3-data.meso.umontpellier.fr"
default="https://s3-data.meso.umontpellier.fr",
)
@click.option(
'-b',
'--storage_bucket',
"-b",
"--storage_bucket",
help="Storage bucket assets will be sent to",
type=str,
default="sm1-gdc"
default="sm1-gdc",
)
@click.option(
"-o",
"--overwrite",
is_flag=True,
default=False,
help="Overwrite assets if already existing")
help="Overwrite assets if already existing",
)
def publish(
stac_obj_path: str,
stac_endpoint: str,
storage_endpoint: str,
storage_bucket: str,
overwrite: bool
stac_obj_path: str,
stac_endpoint: str,
storage_endpoint: str,
storage_bucket: str,
overwrite: bool,
):
"""
Publish a STAC object (collection or item collection)
"""
"""Publish a STAC object (collection or item collection)."""
handler = TransactionsHandler(
stac_endpoint=stac_endpoint,
storage_endpoint=storage_endpoint,
storage_bucket=storage_bucket,
assets_overwrite=overwrite
assets_overwrite=overwrite,
)
handler.load_and_publish(stac_obj_path)
@theia_dumper.command(context_settings={'show_default': True})
@theia_dumper.command(context_settings={"show_default": True})
@click.option(
'--stac_endpoint',
"--stac_endpoint",
help="Endpoint to which STAC objects will be sent",
type=str,
default=DEFAULT_STAC_EP
)
@click.option(
"-c",
"--col_id",
type=str,
help="STAC collection ID",
required=True
)
@click.option(
"-i",
"--item_id",
type=str,
default=None,
help="STAC item ID"
default=DEFAULT_STAC_EP,
)
@click.option("-c", "--col_id", type=str, help="STAC collection ID", required=True)
@click.option("-i", "--item_id", type=str, default=None, help="STAC item ID")
def delete(
stac_endpoint: str,
col_id: str,
item_id: str,
stac_endpoint: str,
col_id: str,
item_id: str,
):
"""
Publish a STAC object (collection or item collection)
"""
delete_stac_obj(
stac_endpoint=stac_endpoint,
col_id=col_id,
item_id=item_id
)
"""Publish a STAC object (collection or item collection)."""
delete_stac_obj(stac_endpoint=stac_endpoint, col_id=col_id, item_id=item_id)
"""Logging stuff."""
import logging
import sys
import os
logging.basicConfig(format='%(levelname)s:%(message)s', level=os.environ.get("LOGLEVEL", "INFO"), stream=sys.stdout)
logging.basicConfig(
format="%(levelname)s:%(message)s",
level=os.environ.get("LOGLEVEL", "INFO"),
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
"""STAC stuff."""
import pystac
from pystac import Collection, ItemCollection, Item
from .logger import logger
from urllib.parse import urljoin
from typing import List
import os
from ast import literal_eval
from dataclasses import dataclass
from typing import List
from urllib.parse import urljoin
import dinamis_sdk
import dinamis_sdk.auth
import dinamis_sdk.settings
import pystac
import requests
from pystac import Collection, Item, ItemCollection
from requests.adapters import HTTPAdapter, Retry
from dataclasses import dataclass
from .logger import logger
class STACObjectUnresolved(Exception):
pass
"""Unresolved STAC object exception."""
class UnconsistentCollectionIDs(Exception):
pass
"""Inconsistent STAC collection exception."""
def create_session():
"""Create a requests session"""
"""Create a requests session."""
sess = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[400, 403, 408, 410, 419, 421, 422,
424, 425, 429, 500, 502, 503, 504, 505],
allowed_methods=frozenset(["PUT", "POST"])
status_forcelist=[
400,
403,
408,
410,
419,
421,
422,
424,
425,
429,
500,
502,
503,
504,
505,
],
allowed_methods=frozenset(["PUT", "POST"]),
)
adapter = HTTPAdapter(max_retries=retries)
sess.mount("http://", adapter=adapter)
......@@ -39,9 +58,7 @@ def create_session():
def post_or_put(url: str, data: dict):
"""Post or put data to url."""
headers = {
"Authorization": f"Bearer {dinamis_sdk.auth.get_access_token()}"
}
headers = {"Authorization": f"Bearer {dinamis_sdk.auth.get_access_token()}"}
sess = create_session()
resp = sess.post(url, json=data, headers=headers)
if resp.status_code == 409:
......@@ -49,7 +66,7 @@ def post_or_put(url: str, data: dict):
resp = sess.put(
f"{url}/{data['id']}",
json=data,
headers=headers
headers=headers,
)
# Unchanged may throw a 404
if not resp.status_code == 404:
......@@ -58,8 +75,8 @@ def post_or_put(url: str, data: dict):
try:
resp.raise_for_status()
except Exception as e:
logger.error(eval(resp.content)["detail"])
raise (e)
logger.error(literal_eval(resp.content)["detail"])
raise e
def load(obj_pth):
......@@ -67,7 +84,7 @@ def load(obj_pth):
for obj_name, cls in {
"collection": Collection,
"item collection": ItemCollection,
"item": Item
"item": Item,
}.items():
logger.debug("Try to read file %s", obj_pth)
try:
......@@ -83,22 +100,18 @@ def load(obj_pth):
def get_assets_root_dir(items: List[Item]) -> str:
"""Get the common prefix of all items assets paths."""
prefix = os.path.commonprefix([
asset.href
for item in items
for asset in item.assets.values()
])
prefix = os.path.commonprefix(
[asset.href for item in items for asset in item.assets.values()]
)
if os.path.isdir(prefix):
return prefix
return os.path.dirname(prefix)
def check_items_collection_id(items: List[Item]):
"""Check that items collection_id is unique"""
"""Check that items collection_id is unique."""
if len(set(item.collection_id for item in items)) != 1:
raise UnconsistentCollectionIDs(
"Collection ID must be the same for all items!"
)
raise UnconsistentCollectionIDs("Collection ID must be the same for all items!")
def get_col_href(col: Collection):
......@@ -106,6 +119,7 @@ def get_col_href(col: Collection):
for link in col.links:
if link.rel == "self":
return link.href
return ""
def get_col_items(col: Collection) -> List[Item]:
......@@ -113,17 +127,16 @@ def get_col_items(col: Collection) -> List[Item]:
col_href = get_col_href(col=col)
return [
load(
os.path.join(
os.path.dirname(col_href),
link.href[2:]
) if link.href.startswith("./") else link.href
os.path.join(os.path.dirname(col_href), link.href[2:])
if link.href.startswith("./")
else link.href
)
for link in col.links
if link.rel == "item"
]
def delete_stac_obj(stac_endpoint: str, col_id: str, item_id: str = None):
def delete_stac_obj(stac_endpoint: str, col_id: str, item_id: str | None = None):
"""Delete an item or a collection."""
logger.info("Deleting %s%s", col_id, f"/{item_id}" if item_id else "")
if item_id:
......@@ -132,10 +145,8 @@ def delete_stac_obj(stac_endpoint: str, col_id: str, item_id: str = None):
url = f"{stac_endpoint}/collections/{col_id}"
resp = requests.delete(
url,
headers={
"Authorization":
f"Bearer {dinamis_sdk.auth.get_access_token()}"
}
headers={"Authorization": f"Bearer {dinamis_sdk.auth.get_access_token()}"},
timeout=5,
)
if resp.status_code != 200:
logger.warning("Deletion failed (%s)", resp.text)
......@@ -144,70 +155,49 @@ def delete_stac_obj(stac_endpoint: str, col_id: str, item_id: str = None):
@dataclass
class TransactionsHandler:
"""Handle STAC and storage transactions."""
stac_endpoint: str
storage_endpoint: str
storage_bucket: str
assets_overwrite: bool
def publish_item(self, item: Item, assets_root_dir: str):
"""Publish an item and all its assets"""
"""Publish an item and all its assets."""
col_id = item.collection_id
target_root_dir = urljoin(
self.storage_endpoint,
self.storage_bucket,
col_id
)
target_root_dir = urljoin(self.storage_endpoint, self.storage_bucket)
# Upload assets files
for _, asset in item.assets.items():
local_filename = asset.href
logger.debug("Local file: %s", local_filename)
target_url = local_filename.replace(
assets_root_dir,
target_root_dir
)
target_url = local_filename.replace(assets_root_dir, target_root_dir)
logger.debug("Target file: %s", target_url)
# Skip when target file exists and overwrite is not enabled
if not self.assets_overwrite:
sess = create_session()
if sess.get(
dinamis_sdk.sign(target_url),
stream=True
).status_code == 200:
logger.info(
"Asset %s already exists. Skipping.",
target_url
)
res = sess.get(dinamis_sdk.sign(target_url), stream=True)
if res.status_code == 200:
logger.info("Asset %s already exists. Skipping.", target_url)
continue
# Upload file
logger.info("Uploading %s ...", local_filename)
try:
dinamis_sdk.push(
local_filename=local_filename,
target_url=target_url
)
dinamis_sdk.push(local_filename=local_filename, target_url=target_url)
except Exception as e:
logger.error(e)
raise (e)
raise e
# Update assets hrefs
logger.debug("Updating assets HREFs ...")
asset.href = target_url
# Push item
logger.info(
"Publishing item \"%s\" in collection \"%s\"",
item.id,
col_id
)
logger.info('Publishing item "%s" in collection "%s"', item.id, col_id)
post_or_put(
urljoin(
self.stac_endpoint,
f"collections/{col_id}/items"
),
item.to_dict(transform_hrefs=False)
urljoin(self.stac_endpoint, f"collections/{col_id}/items"),
item.to_dict(transform_hrefs=False),
)
def publish_items(self, items: List[Item]):
......@@ -216,16 +206,12 @@ class TransactionsHandler:
assets_root_dir = get_assets_root_dir(items=items)
logger.debug("Assets root directory: %s", assets_root_dir)
for item in items:
self.publish_item(
item=item,
assets_root_dir=assets_root_dir
)
self.publish_item(item=item, assets_root_dir=assets_root_dir)
def publish_collection(self, collection: Collection):
"""Publish an empty collection"""
"""Publish an empty collection."""
post_or_put(
url=urljoin(self.stac_endpoint, "/collections"),
data=collection.to_dict()
url=urljoin(self.stac_endpoint, "/collections"), data=collection.to_dict()
)
def publish_collection_with_items(self, collection: Collection):
......@@ -240,7 +226,7 @@ class TransactionsHandler:
self.publish_items(items=item_collection.items)
def load_and_publish(self, obj_pth: str):
"""Load and publish the serialized STAC object"""
"""Load and publish the serialized STAC object."""
obj = load(obj_pth=obj_pth)
if isinstance(obj, Collection):
self.publish_collection_with_items(collection=obj)
......@@ -252,10 +238,8 @@ class TransactionsHandler:
f"(got {type(obj)})"
)
def delete(self, col_id: str, item_id: str = None):
def delete(self, col_id: str, item_id: str | None = None):
"""Delete an item or a collection."""
delete_stac_obj(
stac_endpoint=self.stac_endpoint,
col_id=col_id,
item_id=item_id
stac_endpoint=self.stac_endpoint, col_id=col_id, item_id=item_id
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment