1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
//! # Registry Sync
//!
//! In order for builds.rs to do it's job, it needs to have an up-to-date list of crates at all
//! times. This crate is responsible for doing that, by implementing a one-way synchronization from
//! a Rust registry to the buildsrs database.
//!
//! Rust package registries (such as [crates.io](https://crates.io)) have several ways for getting
//! data out of them, including a HTTP API, nightly database dumps and the Git index. The latter
//! is a Git repository that contains all crate metadata. This index was chosen as the source of
//! data for synchronization purposes, because it is relatively straightforward to consume.
//!
//! This crate exports a [`Syncer`] type, which implements the synchronization between a given
//! Git index and a database connection.
use anyhow::{anyhow, Result};
use buildsrs_database::AnyMetadata;
use chrono::{SecondsFormat, Utc};
use crates_index::GitIndex;
use futures::{
future::{join, ready, FutureExt},
stream::{iter, once, StreamExt, TryStreamExt},
};
use log::*;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{mpsc::channel, Mutex},
task::spawn_blocking,
time::{self, Instant, MissedTickBehavior},
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::instrument;
/// Synchronize a package registry with the database.
pub struct Syncer {
database: AnyMetadata,
index: Arc<Mutex<GitIndex>>,
}
/// Length of the crates queue.
const CRATES_QUEUE_LENGTH: usize = 1024;
/// How many database requests to keep in-flight in parallel at any given time.
const DATABASE_PIPELINED_REQUESTS: usize = 128;
impl Syncer {
/// Create new instance, given a database connection and a [`GitIndex`].
pub fn new(database: AnyMetadata, index: GitIndex) -> Self {
Self {
database,
index: Arc::new(Mutex::new(index)),
}
}
/// Updates crates index.
///
/// This will cause a network access, because it will attempt to fetch the latest state from
/// the remote crates index using git.
#[instrument(skip(self))]
pub async fn update(&self) -> Result<()> {
let instant = Instant::now();
let mut index = self.index.clone().lock_owned().await;
spawn_blocking(move || index.update()).await??;
info!("Updated crates in {:?}", instant.elapsed());
Ok(())
}
/// Synchronize crate index with database.
#[instrument(skip(self))]
pub async fn sync(&self) -> Result<()> {
let handle = self.database.write().await.map_err(|e| anyhow!(e))?;
let index = self.index.clone().lock_owned().await;
let config = index.index_config()?;
let (sender, receiver) = channel(CRATES_QUEUE_LENGTH);
let start = Instant::now();
// launch a blocking reader which emits a stream of crates into a queue
let reader = spawn_blocking(move || {
let mut count: usize = 0;
for krate in index.crates() {
sender.blocking_send(krate)?;
count += 1;
}
Ok(count) as Result<usize>
});
// launch a writer, which turns the crates into a stream of database
// writes. the database writes are pipelined.
let handle_ref = &handle;
let config = &config;
let writer = async move {
ReceiverStream::new(receiver)
.enumerate()
.flat_map(move |(index, krate)| {
debug!("Syncing crate #{index} {}", krate.name());
let name = krate.name().to_string();
let versions = krate.versions().to_vec();
#[allow(clippy::async_yields_async)]
once(ready(
async move {
handle_ref.crate_add(&name).await.map_err(|e| anyhow!(e))?;
Ok(()) as Result<()>
}
.boxed(),
))
.chain(iter(versions.into_iter()).map(move |version| {
let name = krate.name().to_string();
async move {
handle_ref
.crate_version_add(
&name,
version.version(),
&version
.download_url(config)
.ok_or(anyhow!("Missing download URL for {name}"))?,
version.checksum(),
version.is_yanked(),
)
.await
.map_err(|e| anyhow!(e))?;
Ok(()) as Result<()>
}
.boxed()
}))
})
.buffer_unordered(DATABASE_PIPELINED_REQUESTS)
.try_collect::<()>()
.await?;
Ok(()) as Result<_>
};
let (reader, writer) = join(reader, writer).await;
let count = reader??;
writer?;
info!("Wrote {count} crates in {:?}", start.elapsed());
info!("Committing changes");
handle.commit().await.map_err(|e| anyhow!(e))?;
Ok(())
}
/// Run a single iteration of the sync loop.
#[instrument(skip(self), fields(start = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)))]
pub async fn iteration(&mut self) -> Result<()> {
info!("Updating crate index");
self.update().await?;
info!("Synchronizing crate index");
self.sync().await?;
Ok(())
}
/// Launch a synchronization loop.
#[instrument(skip(self))]
pub async fn sync_loop(&mut self, interval: Duration) -> Result<()> {
info!("Launching sync loop");
let mut timer = time::interval(interval);
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
let instant = timer.tick().await;
self.iteration().await?;
info!("Finished iteration in {:?}", instant.elapsed());
}
}
}