Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sed/core/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PathsModel(BaseModel):

raw: DirectoryPath
processed: Optional[Union[DirectoryPath, NewPath]] = None
meta: Optional[Union[DirectoryPath, NewPath]] = None


class CopyToolModel(BaseModel):
Expand Down
54 changes: 51 additions & 3 deletions src/sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
set_verbosity(logger, self._verbose)

self.instrument: str = self._config["core"].get("instrument", "hextof") # default is hextof
self.beamtime_dir: str = None
self.raw_dir: str = None
self.processed_dir: str = None
self.meta_dir: str = None

@property
def verbose(self) -> bool:
Expand Down Expand Up @@ -111,9 +113,14 @@
# Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided
if "paths" in self._config["core"]:
raw_dir = Path(self._config["core"]["paths"].get("raw", ""))
print(raw_dir)
processed_dir = Path(
self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")),
)
meta_dir = Path(
self._config["core"]["paths"].get("meta", raw_dir.joinpath("meta")),
)
beamtime_dir = Path(raw_dir).parent

else:
try:
Expand Down Expand Up @@ -147,11 +154,14 @@
raw_dir = raw_paths[0].resolve()

processed_dir = beamtime_dir.joinpath("processed")
meta_dir = beamtime_dir.joinpath("meta/fabtrack/")

Check warning on line 157 in src/sed/loader/flash/loader.py

View workflow job for this annotation

GitHub Actions / lint

Unknown word (fabtrack)

processed_dir.mkdir(parents=True, exist_ok=True)

self.beamtime_dir = str(beamtime_dir)
self.raw_dir = str(raw_dir)
self.processed_dir = str(processed_dir)
self.meta_dir = str(meta_dir)

@property
def available_runs(self) -> list[int]:
Expand Down Expand Up @@ -223,7 +233,7 @@
# Return the list of found files
return [str(file.resolve()) for file in files]

def parse_metadata(self, token: str = None) -> dict:
def parse_scicat_metadata(self, token: str = None) -> dict:
"""Uses the MetadataRetriever class to fetch metadata from scicat for each run.

Returns:
Expand All @@ -239,6 +249,23 @@

return metadata

def parse_local_metadata(self) -> dict:
"""Uses the MetadataRetriever class to fetch metadata from local folder for each run.

Returns:
dict: Metadata dictionary
"""
metadata_retriever = MetadataRetriever(self._config["metadata"])
metadata = metadata_retriever.get_local_metadata(
beamtime_id=self._config["core"]["beamtime_id"],
beamtime_dir=self.beamtime_dir,
meta_dir=self.meta_dir,
runs=self.runs,
metadata=self.metadata,
)

return metadata

def get_count_rate(self, fids=None, **kwds) -> tuple[np.ndarray, np.ndarray]:
"""
Calculates the count rate using the number of rows and elapsed time for each file.
Expand Down Expand Up @@ -362,7 +389,7 @@
folders: str | Sequence[str] = None,
runs: str | int | Sequence[str | int] = None,
ftype: str = "h5",
metadata: dict = {},
metadata: dict | None = None,
collect_metadata: bool = False,
**kwds,
) -> tuple[dd.DataFrame, dd.DataFrame, dict]:
Expand Down Expand Up @@ -406,6 +433,9 @@
FileNotFoundError: If the conversion fails for some files or no data is available.
ValueError: If collect_metadata is True and no token is available.
"""
if metadata is None:
metadata = {}

detector = kwds.pop("detector", "")
force_recreate = kwds.pop("force_recreate", False)
processed_dir = kwds.pop("processed_dir", None)
Expand Down Expand Up @@ -465,7 +495,25 @@
if self.instrument == "wespe":
df, df_timed = wespe_convert(df, df_timed)

self.metadata.update(self.parse_metadata(token) if collect_metadata else {})
# self.metadata.update(self.parse_metadata(token) if collect_metadata else {})
# if len(self.parse_scicat_metadata(token)) == 0:

# print("No SciCat metadata available, checking local folder")
# self.metadata.update(self.parse_local_metadata())
# else:
# print("Metadata taken from SciCat")
# self.metadata.update(self.parse_scicat_metadata(token) if collect_metadata else {})
scicat_metadata = self.parse_scicat_metadata(token)
scicat_runs = scicat_metadata.get("scientificMetadata", {})

if not any(scicat_runs.values()):
logger.warning("No SciCat metadata available, checking local folder")
self.metadata.update(self.parse_local_metadata())
else:
logger.warning("Metadata taken from SciCat")
if collect_metadata:
self.metadata.update(scicat_metadata)

self.metadata.update(bh.metadata)

logger.info(f"Loading complete in {time.time() - t0: .2f} s")
Expand Down
203 changes: 169 additions & 34 deletions src/sed/loader/flash/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from __future__ import annotations

import requests
import json, yaml
from pathlib import Path

from sed.core.config import read_env_var
from sed.core.config import save_env_var
Expand All @@ -15,8 +17,7 @@

class MetadataRetriever:
"""
A class for retrieving metadata from a Scicat instance based
on beamtime and run IDs.
Retrieves metadata from SciCat or local YAML files for a given beamtime and runs.
"""

def __init__(self, metadata_config: dict, token: str = None) -> None:
Expand Down Expand Up @@ -55,10 +56,11 @@ def get_metadata(
self,
beamtime_id: str,
runs: list,
metadata: dict = None,
metadata: dict | None = None,
) -> dict:
"""
Retrieves metadata for a given beamtime ID and list of runs.
Retrieves metadata for a beamtime and runs from SciCat.
Returns a dict with 'scientificMetadata' keyed by run ID.

Args:
beamtime_id (str): The ID of the beamtime.
Expand All @@ -77,12 +79,17 @@ def get_metadata(
if metadata is None:
metadata = {}

all_runs_metadata: dict[str, dict] = {}

for run in runs:
pid = f"{beamtime_id}/{run}"
logger.debug(f"Retrieving metadata for PID: {pid}")
# logger.debug(f"Retrieving metadata for PID: {pid}")
metadata_run = self._get_metadata_per_run(pid)
metadata.update(metadata_run) # TODO: Not correct for multiple runs
# metadata.update(metadata_run) # TODO: Not correct for multiple runs
# Use 'scientificMetadata' if available, otherwise entire dict
all_runs_metadata[run] = metadata_run.get("scientificMetadata", metadata_run)

metadata["scientificMetadata"] = all_runs_metadata
logger.debug(f"Retrieved metadata with {len(metadata)} entries")
return metadata

Expand All @@ -103,44 +110,172 @@ def _get_metadata_per_run(self, pid: str) -> dict:
headers2["Authorization"] = f"Bearer {self.token}"

try:
logger.debug(f"Attempting to fetch metadata with new URL format for PID: {pid}")
dataset_response = requests.get(
self._create_new_dataset_url(pid),
headers=headers2,
timeout=10,
)
dataset_response.raise_for_status()
# logger.debug(f"Attempting to fetch metadata with new URL format for PID: {pid}")
# dataset_response = requests.get(
# self._create_new_dataset_url(pid),
# headers=headers2,
# timeout=10,
# )
# dataset_response.raise_for_status()
logger.debug(f"Fetching metadata (new URL) for PID: {pid}")
response = requests.get(self._create_new_dataset_url(pid), headers=headers2, timeout=10)
response.raise_for_status()

# Check if response is an empty object because wrong url for older implementation
if not dataset_response.content:
# if not dataset_response.content:
# logger.debug("Empty response, trying old URL format")
# dataset_response = requests.get(
# self._create_old_dataset_url(pid),
# headers=headers2,
# timeout=10,
# )
if not response.content:
logger.debug("Empty response, trying old URL format")
dataset_response = requests.get(
self._create_old_dataset_url(pid),
headers=headers2,
timeout=10,
)
response = requests.get(self._create_old_dataset_url(pid), headers=headers2, timeout=10)
# If the dataset request is successful, return the retrieved metadata
# as a JSON object
return dataset_response.json()
# return dataset_response.json()
return response.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to retrieve metadata for PID {pid}: {e}")
return {}

except requests.exceptions.RequestException as exception:
logger.warning(f"Failed to retrieve metadata for PID {pid}: {str(exception)}")
return {} # Return an empty dictionary for this run
# except requests.exceptions.RequestException as exception:
# logger.warning(f"Failed to retrieve metadata for PID {pid}: {str(exception)}")
# return {} # Return an empty dictionary for this run

def _create_old_dataset_url(self, pid: str) -> str:
return "{burl}/{url}/%2F{npid}".format(
burl=self.url,
url="Datasets",
npid=self._reformat_pid(pid),
)
return f"{self.url}datasets/%2F{self._reformat_pid(pid)}"

def _create_new_dataset_url(self, pid: str) -> str:
return "{burl}/{url}/{npid}".format(
burl=self.url,
url="Datasets",
npid=self._reformat_pid(pid),
)
return f"{self.url}datasets/{self._reformat_pid(pid)}"

def _reformat_pid(self, pid: str) -> str:
"""SciCat adds a pid-prefix + "/" but at DESY prefix = "" """
return (pid).replace("/", "%2F")
"""Replace '/' with '%2F' for SciCat PID."""
return pid.replace("/", "%2F")

# ----------------------------
# Local metadata
# ----------------------------
def get_local_metadata(
self,
beamtime_id: str,
beamtime_dir: str | Path,
meta_dir: str | Path,
runs: list,
metadata: dict | None = None,
) -> dict:
"""
Retrieves metadata for a beamtime and runs from local YAML files.
Returns a dict with 'scientificMetadata' keyed by run ID.

Args:
beamtime_dir (str)|Path: Beamtime directory.
meta_dir (str)|Path: Local metadata directory..
runs (list): A list of run IDs.
metadata (dict, optional): The existing metadata dictionary.
Defaults to None.

Returns:
Dict: The updated metadata dictionary.

Raises:
Exception: If the request to retrieve metadata fails.
"""
if metadata is None:
metadata = {}

beamtime_metadata = self._get_beamtime_metadata(beamtime_dir, beamtime_id)
metadata.update(beamtime_metadata)

all_runs_metadata: dict[str, dict] = {}

for run in runs:
logger.debug(f"Retrieving local metadata for run: {run}")
run_metadata = self._get_local_metadata_per_run(meta_dir, run)
all_runs_metadata[run] = run_metadata.get("_data", {})

metadata["scientificMetadata"] = all_runs_metadata

logger.debug(f"Retrieved metadata with {len(metadata)} entries")
return metadata

def _get_beamtime_metadata(
self,
beamtime_dir: str | Path,
beamtime_id: str,
) -> dict:
"""
Retrieves general metadata for a given beamtime ID from beamtime-metadata-{beamtime_id}.json file

Args:
beamtime_dir (str)|Path: Beamtime directory.
beamtime_id (str): The ID of the beamtime.

Returns:
Dict: The retrieved metadata dictionary.

Raises:
Exception: If the request to retrieve metadata fails.
"""
try:
beamtime_dir = Path(beamtime_dir)
filepath = beamtime_dir / f"beamtime-metadata-{beamtime_id}.json"
with filepath.open("r") as f:
return json.load(f)
except Exception as exc:
logger.warning(f"Failed to retrieve metadata for beamtime {beamtime_id}: {exc}")
return {}


def _get_local_metadata_per_run(self, meta_dir: str | Path, run: str) -> dict:
"""
Retrieves metadata for a specific run from the latest YAML file:
{run}_N.yaml (highest N) or fallback to {run}.yaml

Args:
meta_dir(str): The existing local metadata folder.

Returns:
dict: The retrieved metadata.

Raises:
Exception: If the request to retrieve metadata fails.
"""
try:
# run = str(run)
meta_dir = Path(meta_dir)

# candidates = []
run = str(run)
candidates: list[tuple[int, Path]] = []

# Look for versioned YAML files
for path in meta_dir.glob(f"{run}_*.yaml"):
try:
version = int(path.stem.split("_")[-1])
candidates.append((version, path))
except ValueError:
continue

# Fallback: unversioned single file
if not candidates:
single_file = meta_dir / f"{run}.yaml"
if single_file.exists():
candidates.append((0, single_file))

if not candidates:
raise FileNotFoundError(f"No metadata files found for run {run} in {meta_dir}")

# Pick the latest version
_, latest_path = max(candidates, key=lambda x: x[0])

logger.info(f"Loading local metadata from {latest_path.name}")

run_metadata = yaml.safe_load(latest_path.read_text())
return run_metadata or {"_data": {}}

except Exception as exc:
logger.warning(f"Failed to retrieve local metadata for run {run}: {exc}")
return {"_data": {}}
Loading