#![allow(missing_docs)]
use super::*;
use deadpool::unmanaged::{Object, Pool as Deadpool};
use futures::Stream;
use ouroboros::self_referencing;
use ssh_key::{HashAlg, PublicKey};
use std::{collections::BTreeSet, ops::Deref, pin::Pin, sync::Arc};
use tokio::task::JoinHandle;
use tokio_postgres::{connect, AsyncMessage, Client, GenericClient, NoTls, Statement};
pub use tokio_postgres::{Error, Transaction};
use uuid::Uuid;
#[macro_use]
mod macros;
pub mod entity;
#[cfg(feature = "temp")]
mod temp;
mod util;
use entity::*;
#[cfg(feature = "temp")]
pub use temp::*;
statements!(
fn builder_register(uuid: Uuid, pubkey: i64) {
"INSERT INTO builders(uuid, pubkey)
VALUES ($1, $2)"
}
fn fingerprint_add(pubkey: i64, fingerprint: &str) {
"INSERT INTO pubkey_fingerprints(pubkey, fingerprint)
VALUES ($1, $2)
ON CONFLICT DO NOTHING"
}
fn builder_set_enabled(uuid: Uuid, enabled: bool) {
"UPDATE builders
SET enabled = $2
WHERE uuid = $1"
}
fn builder_set_comment(uuid: Uuid, commend: &str) {
"UPDATE builders
SET comment = $2
WHERE uuid = $1"
}
fn builder_triple_add(builder: Uuid, triple: &str) {
"INSERT INTO builder_triples(builder, triple)
VALUES (
(SELECT id FROM builders WHERE uuid = $1),
(SELECT id FROM triples WHERE name = $2)
)
ON CONFLICT DO NOTHING"
}
fn builder_triple_remove(builder: Uuid, triple: &str) {
"DELETE FROM builder_triples
WHERE builder = (SELECT id FROM builders WHERE uuid = $1)
AND triple = (SELECT id FROM triples WHERE name = $2)"
}
fn builder_request(builder: Uuid, triple: &str) {
"SELECT 1"
}
fn triple_add(name: &str) {
"INSERT INTO triples(name) VALUES ($1)
ON CONFLICT DO NOTHING"
}
fn triple_remove(name: &str) {
"DELETE FROM triples
WHERE name = $1"
}
fn triple_enabled(name: &str, enabled: bool) {
"UPDATE triples
SET enabled = $2
WHERE name = $1"
}
fn triple_rename(triple: &str, name: &str) {
"UPDATE triples
SET name = $2
WHERE name = $1"
}
fn crate_add(name: &str) {
"INSERT INTO crates(name) VALUES ($1)
ON CONFLICT DO NOTHING"
}
fn crate_version_add(krate: &str, version: &str, url: &str, checksum: &str, yanked: bool) {
"INSERT INTO crate_versions_view(name, version, url, checksum, yanked)
VALUES ($1, $2, $3, $4, $5)"
}
fn job_stage(job: Uuid, stage: &str) {
"UPDATE jobs
SET stage = (SELECT id FROM job_stages WHERE name = $2)
WHERE uuid = $1"
}
fn job_log(job: Uuid, line: &str) {
"INSERT INTO job_logs(job, stage, line)
VALUES (
(SELECT id FROM jobs WHERE uuid = $1),
(SELECT stage FROM jobs WHERE uuid = $1),
$2
)"
}
let builder_by_fingerprint = "
SELECT uuid
FROM builders
JOIN pubkey_fingerprints_view
ON builders.pubkey = pubkey_fingerprints_view.id
WHERE fingerprint = $1
";
let builder_get = "
SELECT *
FROM builders_view
WHERE uuid = $1
";
let builder_list = "
SELECT uuid
FROM builders
";
let builder_triples = "
SELECT triple_name
FROM builder_triples_view
WHERE builder_uuid = $1
";
let triple_list = "
SELECT name
FROM triples
";
let triple_info = "
SELECT *
FROM triples
WHERE name = $1
";
let crate_list = "
SELECT name
FROM crates
WHERE name % $1
";
let crate_info = "
SELECT *
FROM crates
WHERE name = $1
";
let crate_versions = "
SELECT version
FROM crate_versions_view
WHERE name = $1
";
let version_info = "
SELECT *
FROM crate_versions_view
WHERE name = $1
AND version = $2
";
let task_list = "
SELECT *
FROM tasks_view
WHERE coalesce(crate = $1, true)
AND coalesce(version = $2, true)
AND coalesce(kind = $3, true)
AND coalesce(triple = $4, true)
";
let job_create = "
INSERT INTO jobs(uuid, builder, task, stage)
VALUES (
$2,
(SELECT id FROM builders WHERE uuid = $1),
(SELECT id FROM tasks_queue LIMIT 1),
(SELECT id FROM job_stages WHERE name = 'init')
)
RETURNING (uuid)
";
let job_info = "
SELECT *
FROM jobs_view
WHERE uuid = $1
";
let pubkey_add = "
INSERT INTO pubkeys (encoded)
VALUES ($1)
ON CONFLICT (encoded)
DO NOTHING
RETURNING id;
";
);
#[cfg(any(feature = "migrations", test))]
refinery::embed_migrations!("migrations");
#[derive(Clone, Debug)]
pub struct Database<T = Client> {
statements: Arc<Statements>,
connection: T,
}
impl<T: GenericClient> Database<T> {
pub async fn builder_lookup(&self, fingerprint: &str) -> Result<Uuid, Error> {
let row = self
.connection
.query_one(&self.statements.builder_by_fingerprint, &[&fingerprint])
.await?;
row.try_get("uuid")
}
pub async fn builder_get(&self, builder: Uuid) -> Result<Builder, Error> {
let row = self
.connection
.query_one(&self.statements.builder_get, &[&builder])
.await?;
Ok(Builder {
uuid: builder,
public_key: {
let pubkey: &str = row.try_get("pubkey")?;
PublicKey::from_openssh(pubkey).unwrap()
},
comment: row.try_get("comment")?,
enabled: row.try_get("enabled")?,
})
}
pub async fn builder_list(&self) -> Result<Vec<Uuid>, Error> {
let rows = self
.connection
.query(&self.statements.builder_list, &[])
.await?;
rows.into_iter().map(|row| row.try_get("uuid")).collect()
}
pub async fn builder_triples(&self, builder: Uuid) -> Result<BTreeSet<String>, Error> {
let rows = self
.connection
.query(&self.statements.builder_triples, &[&builder])
.await?;
rows.into_iter()
.map(|row| row.try_get("triple_name"))
.collect()
}
pub async fn triple_list(&self) -> Result<BTreeSet<String>, Error> {
let rows = self
.connection
.query(&self.statements.triple_list, &[])
.await?;
rows.into_iter().map(|row| row.try_get("name")).collect()
}
pub async fn triple_info(&self, triple: &str) -> Result<TargetInfo, Error> {
let row = self
.connection
.query_one(&self.statements.triple_info, &[&triple])
.await?;
Ok(TargetInfo {
name: row.try_get("name")?,
enabled: row.try_get("enabled")?,
})
}
pub async fn task_list(
&self,
krate: Option<&str>,
version: Option<&str>,
task: Option<&str>,
triple: Option<&str>,
) -> Result<Vec<Task>, Error> {
let rows = self
.connection
.query(
&self.statements.task_list,
&[&krate, &version, &task, &triple],
)
.await?;
rows.into_iter()
.map(|row| {
Ok(Task {
krate: row.try_get("crate")?,
version: row.try_get("version")?,
kind: { row.try_get::<&str, &str>("kind")?.parse().unwrap() },
triple: row.try_get("triple")?,
priority: row.try_get("priority")?,
variant: row.try_get("variant")?,
})
})
.collect()
}
pub async fn job_request(&self, builder: Uuid) -> Result<Uuid, Error> {
let row = self
.connection
.query_one(&self.statements.job_create, &[&builder, &Uuid::now_v7()])
.await?;
row.try_get("uuid")
}
pub async fn job_info(&self, job: Uuid) -> Result<JobInfo, Error> {
let row = self
.connection
.query_one(&self.statements.job_info, &[&job])
.await?;
Ok(JobInfo {
uuid: row.try_get("uuid")?,
version: row.try_get("crate_version_version")?,
name: row.try_get("crate_name")?,
builder: row.try_get("builder_uuid")?,
triple: row.try_get("triple_name")?,
url: row.try_get::<_, &str>("url")?.parse().unwrap(),
})
}
pub async fn crate_list(&self, name: &str) -> Result<Vec<String>, Error> {
let rows = self
.connection
.query(&self.statements.crate_list, &[&name])
.await?;
rows.into_iter().map(|row| row.try_get("name")).collect()
}
pub async fn crate_info(&self, name: &str) -> Result<CrateInfo, Error> {
let info = self
.connection
.query_one(&self.statements.crate_info, &[&name])
.await?;
Ok(CrateInfo {
name: info.try_get("name")?,
enabled: info.try_get("enabled")?,
})
}
pub async fn crate_versions(&self, name: &str) -> Result<Vec<String>, Error> {
let rows = self
.connection
.query(&self.statements.crate_versions, &[&name])
.await?;
rows.into_iter().map(|row| row.try_get("version")).collect()
}
pub async fn crate_version_info(
&self,
name: &str,
version: &str,
) -> Result<VersionInfo, Error> {
let info = self
.connection
.query_one(&self.statements.version_info, &[&name, &version])
.await?;
Ok(VersionInfo {
name: info.try_get("name")?,
version: info.try_get("version")?,
checksum: info.try_get("checksum")?,
yanked: info.try_get("yanked")?,
})
}
}
pub type ConnectionStream = Pin<Box<dyn Stream<Item = Result<AsyncMessage, Error>> + Send>>;
impl Database<Client> {
pub async fn new(connection: Client) -> Result<Self, Error> {
Ok(Database {
statements: Arc::new(Statements::prepare(&connection).await?),
connection,
})
}
pub async fn connect(database: &str) -> Result<Self, Error> {
let (client, connection) = connect(database, NoTls).await?;
tokio::spawn(connection);
Database::new(client).await
}
pub async fn transaction(&mut self) -> Result<Database<Transaction<'_>>, Error> {
Ok(Database {
statements: self.statements.clone(),
connection: self.connection.transaction().await?,
})
}
}
impl Database<Transaction<'_>> {
pub async fn commit(self) -> Result<(), Error> {
self.connection.commit().await
}
pub async fn builder_add(
&self,
uuid: Uuid,
key: &PublicKey,
comment: &str,
) -> Result<(), Error> {
let key = self.pubkey_add(key).await?;
self.builder_register(uuid, key).await?;
self.builder_set_comment(uuid, comment).await?;
Ok(())
}
async fn pubkey_add(&self, pubkey: &PublicKey) -> Result<i64, Error> {
let row = self
.connection
.query_one(
&self.statements.pubkey_add,
&[&pubkey.to_openssh().unwrap()],
)
.await?;
let id = row.try_get("id")?;
for alg in [HashAlg::Sha256, HashAlg::Sha512] {
self.fingerprint_add(id, &pubkey.fingerprint(alg).to_string())
.await?;
}
Ok(id)
}
}
#[derive(Debug)]
pub struct DatabaseConnection {
database: Database,
connection: Option<JoinHandle<Result<(), Error>>>,
}
impl DatabaseConnection {
pub fn new(database: Database, connection: Option<JoinHandle<Result<(), Error>>>) -> Self {
Self {
database,
connection,
}
}
pub async fn close(self) -> Result<(), BoxError> {
let Self {
database,
connection,
} = self;
drop(database);
if let Some(connection) = connection {
connection.await??;
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Pool {
pool: Deadpool<DatabaseConnection>,
}
impl Pool {
pub async fn new(database: &str, count: usize) -> Result<Self, Error> {
let mut databases = vec![];
for _ in 0..count {
let (client, connection) = connect(database, NoTls).await?;
databases.push(DatabaseConnection {
connection: Some(tokio::spawn(connection)),
database: Database::new(client).await?,
});
}
Ok(Pool {
pool: Deadpool::from(databases),
})
}
pub async fn close(self) {
self.pool.close();
while let Ok(conn) = self.pool.remove().await {
let _ = conn.close().await;
}
}
pub async fn read(&self) -> Result<Handle, BoxError> {
Ok(Handle {
object: self.pool.get().await?,
})
}
pub async fn write(&self) -> Result<Writer, BoxError> {
let object = self.pool.get().await?;
let writer = Writer::try_new_async_send(object, |object: &mut _| {
Box::pin(async move { object.database.transaction().await.map(Some) })
})
.await?;
Ok(writer)
}
}
impl From<DatabaseConnection> for Pool {
fn from(conn: DatabaseConnection) -> Self {
Pool {
pool: Deadpool::from(vec![conn]),
}
}
}
impl From<Vec<DatabaseConnection>> for Pool {
fn from(conns: Vec<DatabaseConnection>) -> Self {
Pool {
pool: Deadpool::from(conns),
}
}
}
#[derive(Debug)]
pub struct Handle {
object: Object<DatabaseConnection>,
}
impl Deref for Handle {
type Target = Database;
fn deref(&self) -> &Self::Target {
&self.object.database
}
}
trait AsDatabase {
type Client<'a>: GenericClient
where
Self: 'a;
fn database(&self) -> &Database<Self::Client<'_>>;
}
impl AsDatabase for Handle {
type Client<'a> = Client;
fn database(&self) -> &Database<Self::Client<'_>> {
&self.object.database
}
}
impl AsDatabase for Writer {
type Client<'a> = Transaction<'a>;
fn database(&self) -> &Database<Self::Client<'_>> {
Writer::database(self)
}
}
#[self_referencing]
pub struct Writer {
object: Object<DatabaseConnection>,
#[borrows(mut object)]
#[covariant]
transaction: Option<Database<Transaction<'this>>>,
}
impl Writer {
pub async fn commit(mut self) -> Result<(), Error> {
let transaction = self.with_mut(|fields| std::mem::take(fields.transaction));
transaction.unwrap().commit().await?;
Ok(())
}
fn database(&self) -> &Database<Transaction<'_>> {
self.with(|fields| fields.transaction).as_ref().unwrap()
}
}
#[async_trait::async_trait]
impl Metadata for Pool {
async fn read(&self) -> Result<Box<dyn ReadHandle>, BoxError> {
Pool::read(self)
.await
.map(|x| Box::new(x) as Box<dyn ReadHandle>)
}
async fn write(&self) -> Result<Box<dyn WriteHandle>, BoxError> {
Pool::write(self)
.await
.map(|x| Box::new(x) as Box<dyn WriteHandle>)
}
}
#[async_trait::async_trait]
impl<T: AsDatabase + Send + Sync> ReadHandle for T
where
for<'a> <T as AsDatabase>::Client<'a>: Send + Sync,
{
async fn builder_lookup(&self, fingerprint: &str) -> Result<Uuid, Error> {
self.database().builder_lookup(fingerprint).await
}
async fn builder_get(&self, builder: Uuid) -> Result<Builder, Error> {
self.database().builder_get(builder).await
}
async fn builder_list(&self) -> Result<Vec<Uuid>, Error> {
self.database().builder_list().await
}
async fn builder_triples(&self, builder: Uuid) -> Result<BTreeSet<String>, Error> {
self.database().builder_triples(builder).await
}
async fn crate_list(&self, name: &str) -> Result<Vec<String>, Error> {
self.database().crate_list(name).await
}
async fn crate_info(&self, name: &str) -> Result<CrateInfo, Error> {
self.database().crate_info(name).await
}
async fn crate_versions(&self, name: &str) -> Result<Vec<String>, Error> {
self.database().crate_versions(name).await
}
async fn job_info(&self, job: Uuid) -> Result<JobInfo, Error> {
self.database().job_info(job).await
}
async fn crate_version_info(&self, name: &str, version: &str) -> Result<VersionInfo, Error> {
self.database().crate_version_info(name, version).await
}
async fn task_list(
&self,
krate: Option<&str>,
version: Option<&str>,
task: Option<&str>,
triple: Option<&str>,
) -> Result<Vec<Task>, Error> {
self.database()
.task_list(krate, version, task, triple)
.await
}
}
#[async_trait::async_trait]
impl WriteHandle for Writer {
async fn crate_add(&self, name: &str) -> Result<(), BoxError> {
self.database().crate_add(name).await?;
Ok(())
}
async fn crate_version_add(
&self,
name: &str,
version: &str,
url: &str,
checksum: &[u8],
yanked: bool,
) -> Result<(), BoxError> {
self.database()
.crate_version_add(name, version, url, &hex::encode(checksum), yanked)
.await?;
Ok(())
}
async fn job_request(&self, builder: Uuid) -> Result<Uuid, BoxError> {
let uuid = self.database().job_request(builder).await?;
Ok(uuid)
}
async fn commit(self: Box<Self>) -> Result<(), BoxError> {
let writer: Writer = *self;
writer.commit().await?;
Ok(())
}
async fn builder_add(
&self,
uuid: Uuid,
key: &PublicKey,
comment: &str,
) -> Result<(), BoxError> {
self.database().builder_add(uuid, key, comment).await?;
Ok(())
}
async fn builder_set_comment(&self, builder: Uuid, comment: &str) -> Result<(), BoxError> {
self.database()
.builder_set_comment(builder, comment)
.await?;
Ok(())
}
async fn builder_set_enabled(&self, builder: Uuid, enabled: bool) -> Result<(), BoxError> {
self.database()
.builder_set_enabled(builder, enabled)
.await?;
Ok(())
}
async fn builder_triple_add(&self, builder: Uuid, triple: &str) -> Result<(), BoxError> {
self.database().triple_add(triple).await?;
self.database().builder_triple_add(builder, triple).await?;
Ok(())
}
async fn builder_triple_remove(&self, builder: Uuid, triple: &str) -> Result<(), BoxError> {
self.database()
.builder_triple_remove(builder, triple)
.await?;
Ok(())
}
}