Subject: [PATCH 1/3] scraper for discharge data from the grcd

@@ -0,0 +1,177 @@
+use std::io::{Cursor, Read};
+use anyhow::{anyhow, Error, Result};
+use calamine::{open_workbook_from_rs, Data, DataType, Reader, Xlsx};
+use cap_std::fs::Dir;
+use smallvec::smallvec;
+use time::{Date, Month};
+use zip::ZipArchive;
+use harvester::{
+    client::Client,
+    fetch_many,
+    utilities::{make_key, point_like_bounding_box},
+    write_dataset, Source,
+use metadaten::{
+    dataset::{
+        r#type::{Domain, Station, Type},
+        Dataset, Language, License, Organisation, OrganisationRole, Region, SourceUrlExplainer,
+    },
+    wise::WISE,
+pub async fn harvest(dir: &Dir, client: &Client, source: &Source) -> Result<(usize, usize, usize)> {
+    let station_list = fetch_station_list(client, source).await?;
+    let count = station_list.len();
+    let (results, errors) = fetch_many(0, 0, station_list, |station| {
+        translate_station_dataset(dir, client, source, station)
+    })
+    .await;
+    Ok((count, results, errors))
+async fn fetch_station_list(client: &Client, source: &Source) -> Result<Vec<Item>> {
+    let url = source.url.join("grdc/")?;
+    let bytes = client
+        .make_request(
+            source,
+            "station_list".to_owned(),
+            Some(&url),
+            |client| async {
+                let bytes = client
+                    .get(url.clone())
+                    .send()
+                    .await?
+                    .error_for_status()?
+                    .bytes()
+                    .await?;
+                let mut archive = ZipArchive::new(Cursor::new(&*bytes))?;
+                let mut bytes = Vec::new();
+                archive
+                    .by_name("GRDC_Stations.xlsx")?
+                    .read_to_end(&mut bytes)?;
+                Ok::<_, Error>(bytes)
+            },
+        )
+        .await?;
+    let mut workbook: Xlsx<_> = open_workbook_from_rs(Cursor::new(&*bytes))?;
+    let worksheet = workbook.worksheet_range("station_catalogue")?;
+    let station_list = worksheet
+        .rows()
+        .filter(|row| row[5] == "DE")
+        .map(|row| Item {
+            station_id: row[0].to_string(),
+            river: row[3].to_string(),
+            name: row[4].to_string(),
+            lat: row[6].clone(),
+            lon: row[7].clone(),
+            start_year: row[18].clone(),
+            end_year: row[19].clone(),
+        })
+        .collect::<Vec<_>>();
+    Ok(station_list)
+async fn translate_station_dataset(
+    dir: &Dir,
+    client: &Client,
+    source: &Source,
+    item: Item,
+) -> Result<(usize, usize, usize)> {
+    let key = make_key(&item.station_id).into_owned();
+    let url = source
+        .url
+        .join("applications/public.html?publicuser=PublicUser#dataDownload/Stations")?;
+    let title = format!("Messstation {} am Fluss {}",, item.river);
+    let description = format!(
+        r#"Dieser Datensatz enthält Abflussdaten der Messstation {} am Fluss {}.
+        Die Daten werden bereit gestellt vom Global Runoff Data Centre (GRDC).
+        Um die Rohdaten für diese Station zu erhalten, muss das Data Portal des GRDC ({}) verwendet werden.
+        Dort kann man entweder anhand des Station Name: {} oder der Station Number: {} nach den Daten suchen."#,
+, item.river, url,, item.station_id
+    );
+    let types = smallvec![Type::Measurements {
+        domain: Domain::Rivers,
+        station: Some(Station {
+            id: Some(item.station_id.into()),
+            ..Default::default()
+        }),
+        measured_variables: smallvec!["Abfluss".to_owned()],
+        methods: Default::default(),
+    }];
+    let region = Region::Other(;
+    let mut regions = smallvec![region];
+    let lat =|| anyhow!("Missing lat"))?;
+    let lon = item.lon.as_f64().ok_or_else(|| anyhow!("Missing lon"))?;
+    let bounding_boxes = smallvec![point_like_bounding_box(lat, lon)];
+    regions.extend(WISE.match_shape(lon, lat).map(Region::Watershed));
+    // data source only contains start and end year of the time series
+    let start_year = item
+        .start_year
+        .as_i64()
+        .ok_or_else(|| anyhow!("Missing start year"))?;
+    let end_year = item
+        .end_year
+        .as_i64()
+        .ok_or_else(|| anyhow!("Missing start year"))?;
+    let start_date = Date::from_calendar_date(start_year as i32, Month::January, 1)?;
+    let end_date = Date::from_calendar_date(end_year as i32, Month::December, 31)?;
+    let time_ranges = smallvec![(start_date, end_date).into()];
+    let provider = Organisation::WikiData {
+        identifier: 119010386, // GRDC
+        role: OrganisationRole::Publisher,
+    };
+    let dataset = Dataset {
+        title,
+        description: Some(description),
+        types,
+        bounding_boxes,
+        regions,
+        time_ranges,
+        organisations: smallvec![provider],
+        language: Language::English,
+        license: License::OtherClosed,
+        origins:,
+        source_url: url.into(),
+        source_url_explainer: SourceUrlExplainer::CopyStationId(
+            "zum Datenportal: bitte dort die Suchfunktion nutzen (Station Number wird automatisch kopiert)"
+                .to_owned(),
+        ),
+        ..Default::default()
+    };
+    write_dataset(dir, client, source, key, dataset).await
+#[derive(Debug, Clone)]
+struct Item {
+    river: String,
+    name: String,
+    station_id: String,
+    lat: Data,
+    lon: Data,
+    start_year: Data,
+    end_year: Data,
@@ -0,0 +1,191 @@
+use anyhow::Result;
+use cap_std::fs::Dir;
+use scraper::{Html, Selector};
+use smallvec::smallvec;
+use time::{macros::format_description, Date};
+use harvester::{
+    client::Client,
+    fetch_many, selectors,
+    utilities::{collect_text, make_key, select_text},
+    write_dataset, Source,
+use metadaten::dataset::{
+    r#type::{TextType, Type},
+    Dataset, Language, License, Organisation, OrganisationRole, Resource, ResourceType,
+pub async fn harvest(dir: &Dir, client: &Client, source: &Source) -> Result<(usize, usize, usize)> {
+    let selectors = &Selectors::default();
+    let news_list = fetch_news_list(client, source, selectors).await?;
+    let mut count = news_list.len();
+    let (results, errors) = fetch_many(0, 0, news_list, |news_item| {
+        translate_news_dataset(dir, client, source, news_item, selectors)
+    })
+    .await;
+    let publication_list = fetch_publication_list(client, source, selectors).await?;
+    count += publication_list.len();
+    let (results, errors) = fetch_many(results, errors, publication_list, |publication_item| {
+        translate_publication_dataset(dir, client, source, publication_item)
+    })
+    .await;
+    Ok((count, results, errors))
+async fn fetch_news_list(
+    client: &Client,
+    source: &Source,
+    selectors: &Selectors,
+) -> Result<Vec<Item>> {
+    let url = source.url.join("news/")?;
+    let text = client
+        .fetch_text(source, "news_list".to_owned(), &url)
+        .await?;
+    let document = Html::parse_document(&text);
+    let item_list = document
+        .select(&selectors.news_item)
+        .map(|element| {
+            let href = element.attr("href").unwrap();
+            let title = collect_text(element.text());
+            let url = href.trim_start_matches("../").to_owned();
+            Ok(Item { url, title })
+        })
+        .collect::<Result<Vec<_>>>()?;
+    Ok(item_list)
+async fn translate_news_dataset(
+    dir: &Dir,
+    client: &Client,
+    source: &Source,
+    item: Item,
+    selectors: &Selectors,
+) -> Result<(usize, usize, usize)> {
+    let key = make_key(&item.url).into_owned();
+    let url = source.url.join(&item.url)?;
+    let text = client.fetch_text(source, key.clone(), &url).await?;
+    let document = Html::parse_document(&text);
+    let description = select_text(&document, &selectors.news_description);
+    let date = select_text(&document, &selectors.news_date);
+    let issued = Date::parse(
+        &date,
+        format_description!("[month repr:long] [day padding:none], [year]"),
+    )?;
+    let provider = Organisation::WikiData {
+        identifier: 119010386, // GRDC
+        role: OrganisationRole::Publisher,
+    };
+    let dataset = Dataset {
+        title: item.title,
+        description: Some(description),
+        types: smallvec![Type::Text {
+            text_type: TextType::News,
+        }],
+        issued: Some(issued),
+        organisations: smallvec![provider],
+        language: Language::English,
+        license: License::OtherClosed,
+        origins:,
+        source_url: url.into(),
+        ..Default::default()
+    };
+    write_dataset(dir, client, source, key, dataset).await
+async fn fetch_publication_list(
+    client: &Client,
+    source: &Source,
+    selectors: &Selectors,
+) -> Result<Vec<Item>> {
+    let url = source.url.join("publications/reports/")?;
+    let text = client
+        .fetch_text(source, "publication_list".to_owned(), &url)
+        .await?;
+    let document = Html::parse_document(&text);
+    let item_list = document
+        .select(&selectors.publication_all)
+        .map(|element| {
+            let href = element.attr("href").unwrap();
+            let title = collect_text(element.text());
+            let url = href.trim_start_matches("../").to_owned();
+            Ok(Item { url, title })
+        })
+        .collect::<Result<Vec<_>>>()?;
+    Ok(item_list)
+async fn translate_publication_dataset(
+    dir: &Dir,
+    client: &Client,
+    source: &Source,
+    item: Item,
+) -> Result<(usize, usize, usize)> {
+    let key = make_key(&item.title).into_owned();
+    let url = source.url.join(&item.url)?;
+    let description = "Until 2015, GRDC has published annual reports including GRDC meeting reports. Those can be accessed via the link.".to_owned();
+    let provider = Organisation::WikiData {
+        identifier: 119010386, // GRDC
+        role: OrganisationRole::Publisher,
+    };
+    let resources = smallvec![Resource {
+        r#type: ResourceType::Pdf,
+        description: Some("Report of the GRDC".to_owned()),
+        url: url.to_string(),
+        primary_content: true,
+        ..Default::default()
+    }];
+    let dataset = Dataset {
+        title: item.title,
+        description: Some(description),
+        types: smallvec![Type::Text {
+            text_type: TextType::Report,
+        }],
+        resources,
+        organisations: smallvec![provider],
+        language: Language::English,
+        license: License::OtherClosed,
+        origins:,
+        source_url: url.into(),
+        machine_readable_source: false,
+        ..Default::default()
+    };
+    write_dataset(dir, client, source, key, dataset).await
+#[derive(Debug, Clone)]
+struct Item {
+    title: String,
+    url: String,
+selectors! {
+    publication_all: ".table a[href]",
+    publication_link: "a",
+    news_item: ".listing-title .no-external",
+    news_description: ".quarto-figure-center+ p, p+ p",
+    news_date: ".date",

@@ -59,19 +59,24 @@ async fn fetch_organisations(client: &Client, source: &Source) -> Result<HashSet
         387917, // administrative divisions of Germany (Q387917) entities in the administrative structure of Germany
+    const CLASSES: &[u64] = &[
+        5227240, // collection of numeric and/or other data sets for secondary use in research
+    ];
     let mut identifiers = HashSet::new();
     for property in PROPERTIES {
         for legal_form in LEGAL_FORMS {
-            if let Err(err) = fetch_organisation_legal_form(
-                client,
-                source,
-                &mut identifiers,
-                *property,
-                *legal_form,
-            )
-            .await
-            {
+            let query = format!(
+                r#"SELECT DISTINCT ?item WHERE {{
+                    ?item (wdt:P{property}/(wdt:P279*)) wd:Q{legal_form} ;
+                    wdt:P17 wd:Q183 . # Germany (Q183)
+                }}"#
+            );
+            let key = format!("organisations-{property}-{legal_form}.isol");
+            if let Err(err) = fetch_entities(client, source, &mut identifiers, &query, key).await {
                     "Failed to fetch legal form {legal_form} via property {property}: {err:#}"
@@ -79,36 +84,38 @@ async fn fetch_organisations(client: &Client, source: &Source) -> Result<HashSet
+    for class in CLASSES {
+        let query = format!(
+            r#"SELECT DISTINCT ?item WHERE {{
+                    ?item (wdt:P31/(wdt:P279*)) wd:Q{class}
+            }}"#
+        );
+        let key = format!("organisations-31-{class}.isol");
+        if let Err(err) = fetch_entities(client, source, &mut identifiers, &query, key).await {
+            tracing::error!("Failed to fetch class {class}: {err:#}");
+        }
+    }
-async fn fetch_organisation_legal_form(
+async fn fetch_entities(
     client: &Client,
     source: &Source,
     identifiers: &mut HashSet<u64>,
-    property: u64,
-    legal_form: u64,
+    query: &str,
+    key: String,
 ) -> Result<()> {
-    let query = format!(
-        r#"SELECT DISTINCT ?item WHERE {{
-    ?item (wdt:P{property}/(wdt:P279*)) wd:Q{legal_form} ;
-        wdt:P17 wd:Q183 . # Germany (Q183)
-    );
     let mut url = source.url.clone();
-        .append_pair("query", &query)
+        .append_pair("query", query)
         .append_pair("format", "json");
     let bytes = client
-        .checked_fetch_bytes(
-            source,
-            timeout_exception,
-            format!("organisations-{property}-{legal_form}.isol"),
-            &url,
-        )
+        .checked_fetch_bytes(source, timeout_exception, key, &url)
     let response = from_slice::<SparqlResponse>(&bytes)?;

@@ -96,12 +96,15 @@ async fn translate_station_dataset(
-    let title = format!("Messstation {} am Fluss {}",, item.river);
+    let title = format!(
+        "Messstation {} ('Station Number': {}) am Fluss {}",
+, item.station_id, item.river
+    );
     let description = format!(
         r#"Dieser Datensatz enthält Abflussdaten der Messstation {} am Fluss {}.
         Die Daten werden bereit gestellt vom Global Runoff Data Centre (GRDC).
         Um die Rohdaten für diese Station zu erhalten, muss das Data Portal des GRDC ({}) verwendet werden.
-        Dort kann man entweder anhand des Station Name: {} oder der Station Number: {} nach den Daten suchen."#,
+        Dort kann man entweder anhand des 'Station Name': {} oder der 'Station Number': {} nach den Daten suchen."#,, item.river, url,, item.station_id
@@ -156,8 +159,7 @@ async fn translate_station_dataset(
         source_url: url.into(),
         source_url_explainer: SourceUrlExplainer::CopyStationId(
-            "zum Datenportal: bitte dort die Suchfunktion nutzen (Station Number wird automatisch kopiert)"
-                .to_owned(),
+            "zum Datenportal: bitte dort mit 'Station Number' die Suchfunktion nutzen".to_owned(),