1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//! # Registry Sync
//!
//! In order for builds.rs to do it's job, it needs to have an up-to-date list of crates at all
//! times. This crate is responsible for doing that, by implementing a one-way synchronization from
//! a Rust registry to the buildsrs database.
//!
//! Rust package registries (such as [crates.io](https://crates.io)) have several ways for getting
//! data out of them, including a HTTP API, nightly database dumps and the Git index. The latter
//! is a Git repository that contains all crate metadata. This index was chosen as the source of
//! data for synchronization purposes, because it is relatively straightforward to consume.
//!
//! This crate exports a [`Syncer`] type, which implements the synchronization between a given
//! Git index and a database connection.

use anyhow::{anyhow, Result};
use buildsrs_database::AnyMetadata;
use chrono::{SecondsFormat, Utc};
use crates_index::GitIndex;
use futures::{
    future::{join, ready, FutureExt},
    stream::{iter, once, StreamExt, TryStreamExt},
};
use log::*;
use std::{sync::Arc, time::Duration};
use tokio::{
    sync::{mpsc::channel, Mutex},
    task::spawn_blocking,
    time::{self, Instant, MissedTickBehavior},
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::instrument;

/// Synchronize a package registry with the database.
pub struct Syncer {
    database: AnyMetadata,
    index: Arc<Mutex<GitIndex>>,
}

/// Length of the crates queue.
const CRATES_QUEUE_LENGTH: usize = 1024;
/// How many database requests to keep in-flight in parallel at any given time.
const DATABASE_PIPELINED_REQUESTS: usize = 128;

impl Syncer {
    /// Create new instance, given a database connection and a [`GitIndex`].
    pub fn new(database: AnyMetadata, index: GitIndex) -> Self {
        Self {
            database,
            index: Arc::new(Mutex::new(index)),
        }
    }

    /// Updates crates index.
    ///
    /// This will cause a network access, because it will attempt to fetch the latest state from
    /// the remote crates index using git.
    #[instrument(skip(self))]
    pub async fn update(&self) -> Result<()> {
        let instant = Instant::now();
        let mut index = self.index.clone().lock_owned().await;
        spawn_blocking(move || index.update()).await??;
        info!("Updated crates in {:?}", instant.elapsed());
        Ok(())
    }

    /// Synchronize crate index with database.
    #[instrument(skip(self))]
    pub async fn sync(&self) -> Result<()> {
        let handle = self.database.write().await.map_err(|e| anyhow!(e))?;
        let index = self.index.clone().lock_owned().await;
        let config = index.index_config()?;
        let (sender, receiver) = channel(CRATES_QUEUE_LENGTH);
        let start = Instant::now();

        // launch a blocking reader which emits a stream of crates into a queue
        let reader = spawn_blocking(move || {
            let mut count: usize = 0;
            for krate in index.crates() {
                sender.blocking_send(krate)?;
                count += 1;
            }
            Ok(count) as Result<usize>
        });

        // launch a writer, which turns the crates into a stream of database
        // writes. the database writes are pipelined.
        let handle_ref = &handle;
        let config = &config;
        let writer = async move {
            ReceiverStream::new(receiver)
                .enumerate()
                .flat_map(move |(index, krate)| {
                    debug!("Syncing crate #{index} {}", krate.name());
                    let name = krate.name().to_string();
                    let versions = krate.versions().to_vec();
                    #[allow(clippy::async_yields_async)]
                    once(ready(
                        async move {
                            handle_ref.crate_add(&name).await.map_err(|e| anyhow!(e))?;
                            Ok(()) as Result<()>
                        }
                        .boxed(),
                    ))
                    .chain(iter(versions.into_iter()).map(move |version| {
                        let name = krate.name().to_string();
                        async move {
                            handle_ref
                                .crate_version_add(
                                    &name,
                                    version.version(),
                                    &version
                                        .download_url(config)
                                        .ok_or(anyhow!("Missing download URL for {name}"))?,
                                    version.checksum(),
                                    version.is_yanked(),
                                )
                                .await
                                .map_err(|e| anyhow!(e))?;
                            Ok(()) as Result<()>
                        }
                        .boxed()
                    }))
                })
                .buffer_unordered(DATABASE_PIPELINED_REQUESTS)
                .try_collect::<()>()
                .await?;

            Ok(()) as Result<_>
        };

        let (reader, writer) = join(reader, writer).await;
        let count = reader??;
        writer?;

        info!("Wrote {count} crates in {:?}", start.elapsed());

        info!("Committing changes");
        handle.commit().await.map_err(|e| anyhow!(e))?;

        Ok(())
    }

    /// Run a single iteration of the sync loop.
    #[instrument(skip(self), fields(start = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)))]
    pub async fn iteration(&mut self) -> Result<()> {
        info!("Updating crate index");
        self.update().await?;

        info!("Synchronizing crate index");
        self.sync().await?;

        Ok(())
    }

    /// Launch a synchronization loop.
    #[instrument(skip(self))]
    pub async fn sync_loop(&mut self, interval: Duration) -> Result<()> {
        info!("Launching sync loop");
        let mut timer = time::interval(interval);
        timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
        loop {
            let instant = timer.tick().await;
            self.iteration().await?;
            info!("Finished iteration in {:?}", instant.elapsed());
        }
    }
}