1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//! # Filesystem Storage
//!
//! This storage implementation uses the filesystem to store artifacts.

use super::*;
use std::{
    fmt::Debug,
    io::ErrorKind,
    path::{Path, PathBuf},
};
use tokio::{fs::OpenOptions, io::AsyncWriteExt};

#[cfg(any(test, feature = "temp"))]
mod temp;

#[cfg(feature = "options")]
mod options;

#[cfg(feature = "options")]
pub(crate) use options::FilesystemOptions;

/// Filesystem-backed storage for artifacts.
#[derive(Clone, Debug)]
pub struct Filesystem<P: AsRef<Path> = PathBuf> {
    path: P,
}

/// Error interacting with the filesystem.
#[derive(thiserror::Error, Debug)]
#[error("error writing to {path:?}")]
pub struct FilesystemError {
    /// Path that was being written to or read from.
    path: PathBuf,
    /// Error that occured.
    #[source]
    error: std::io::Error,
}

impl<P: AsRef<Path>> Filesystem<P> {
    /// Create new Filesystem storage instance.
    pub fn new(path: P) -> Self {
        Self { path }
    }

    /// Get the base path of this filesystem storage instance.
    pub fn path(&self) -> &Path {
        self.path.as_ref()
    }

    /// Get the full path where an artifact ID might be stored.
    pub fn artifact_path(&self, version: &ArtifactId) -> PathBuf {
        self.path().join(version.file_name())
    }

    async fn do_artifact_put(
        &self,
        version: &ArtifactId,
        data: &[u8],
    ) -> Result<(), FilesystemError> {
        let path = self.artifact_path(version);
        let mut file = OpenOptions::new()
            .write(true)
            .create(true)
            .truncate(true)
            .open(&path)
            .await
            .map_err(|error| FilesystemError {
                path: path.clone(),
                error,
            })?;
        file.write_all(data)
            .await
            .map_err(|error| FilesystemError {
                path: path.clone(),
                error,
            })?;
        file.flush()
            .await
            .map_err(|error| FilesystemError { path, error })?;
        Ok(())
    }

    async fn do_artifact_get(&self, version: &ArtifactId) -> Result<Bytes, FilesystemError> {
        let path = self.artifact_path(version);
        tokio::fs::read(&path)
            .await
            .map(Into::into)
            .map_err(|error| FilesystemError { path, error })
    }
}

#[async_trait::async_trait]
impl<P: AsRef<Path> + Send + Sync + Debug> Storage for Filesystem<P> {
    async fn artifact_put(&self, version: &ArtifactId, data: &[u8]) -> Result<(), StorageError> {
        self.do_artifact_put(version, data)
            .await
            .map_err(|error| StorageError::Other(Arc::new(error)))
    }

    async fn artifact_get(&self, version: &ArtifactId) -> Result<ArtifactData, StorageError> {
        let result = self.do_artifact_get(version).await;
        match result {
            Ok(bytes) => Ok(ArtifactData::Data { bytes }),
            Err(error) if error.error.kind() == ErrorKind::NotFound => {
                Err(StorageError::NotFound(Arc::new(error)))
            }
            Err(error) => Err(StorageError::Other(Arc::new(error))),
        }
    }
}