1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! # Storage Cache
//!
//! This implements a layer that can be used with any storage provider which caches responses for a
//! specific amount of time.

use super::*;
use moka::{future::Cache as MokaCache, Expiry};
use std::{
    sync::Arc,
    time::{Duration, Instant},
};

#[cfg(feature = "options")]
mod options;

#[cfg(feature = "options")]
pub(crate) use options::CacheOptions;

/// Cached error.
///
/// This type adds context to the error it contains to communicate to users that this error may
/// have been cached.
#[derive(Debug, thiserror::Error)]
#[error("cached error")]
struct CachedError(#[from] SharedError);

/// Cache entry for one crate lookup.
#[derive(Clone, Debug)]
enum Entry {
    /// Crate is missing.
    Missing(Arc<CachedError>),
    /// Crate exists,
    Data(ArtifactData),
}

impl Entry {
    /// Determine the weight of this entry.
    fn weight(&self) -> usize {
        match self {
            Self::Data(ArtifactData::Data { bytes }) => bytes.len(),
            _ => 1,
        }
    }
}

/// Configuration for storage [`Cache`].
///
/// This allows you to override the behaviour of the cache. In general, you should use the
/// [`CacheConfig::default()`] implementation to create a default configuration. The value
/// that you should consider tweaking is the `capacity`, which you should set to however much
/// memory you are willing to throw at the cache.
#[derive(Clone, Copy, Debug)]
pub struct CacheConfig {
    /// Capacity of cache, in bytes.
    ///
    /// You should set this to however much memory you are willing to use for the cache. In
    /// general, the higher you set this to, the better. The default is set to 16MB.
    pub capacity: u64,

    /// Timeout for missing crate entries.
    ///
    /// Packages are immutable once published, so we can cache them forever. However,
    /// we are also caching negative lookup results, but these can change as artifacts are
    /// published. For that reason, negative lookups have a dedicated cache duration that should be
    /// set to a low value.
    pub timeout_missing: Duration,
}

impl Default for CacheConfig {
    fn default() -> Self {
        Self {
            capacity: 16 * 1024 * 1024,
            timeout_missing: Duration::from_secs(60),
        }
    }
}

impl Expiry<ArtifactId, Entry> for CacheConfig {
    fn expire_after_create(
        &self,
        _key: &ArtifactId,
        value: &Entry,
        _created_at: Instant,
    ) -> Option<Duration> {
        match value {
            Entry::Missing(_) => Some(self.timeout_missing),
            Entry::Data(ArtifactData::Redirect { validity, .. }) => Some(*validity),
            Entry::Data(_) => None,
        }
    }
}

/// Storage caching layer.
///
/// This is a layer you can use to wrap an existing storage provider to add an in-memory cache.
/// This allows you to serve commonly requested artifacts more efficiently.
///
/// The cache is implemented using the moka crate, which is optimized for highly concurrent,
/// lock-free access.
#[derive(Clone, Debug)]
pub struct Cache {
    /// Underlying storage implementation
    storage: AnyStorage,
    /// Cache used for artifact sources
    cache: MokaCache<ArtifactId, Entry>,
}

impl Cache {
    /// Create new caching layer on top of a storage.
    ///
    /// You need to create a [`CacheConfig`] to create the cache, which specifies some important
    /// metrics such as the capacity of the cache.
    /// You can use [`CacheConfig::default()`] to use defaults, which should be sane. Read the
    /// documentation on [`CacheConfig`] for more information on what can be tuned.
    pub fn new(storage: AnyStorage, config: CacheConfig) -> Self {
        // we use a custom weigher to ensure that entries are weighed by their size in bytes.
        // unfortunately, the weigher only supports u32 values, so when our entry is too big (more
        // than 4GB) we will fall back to using the maximum value.
        let cache = MokaCache::builder()
            .weigher(|_key, value: &Entry| -> u32 { value.weight().try_into().unwrap_or(u32::MAX) })
            .max_capacity(config.capacity)
            .expire_after(config)
            .build();

        Self { storage, cache }
    }

    /// Get a reference to the underlying storage.
    pub fn storage(&self) -> &AnyStorage {
        &self.storage
    }

    /// Clear the cache.
    ///
    /// This will invalidate all cache entries.
    pub fn clear(&self) {
        self.cache.invalidate_all();
    }
}

#[async_trait::async_trait]
impl Storage for Cache {
    async fn artifact_put(&self, version: &ArtifactId, data: &[u8]) -> Result<(), StorageError> {
        // we cannot cache mutable operations.
        self.storage().artifact_put(version, data).await
    }

    async fn artifact_get(&self, version: &ArtifactId) -> Result<ArtifactData, StorageError> {
        let storage = self.storage.clone();
        // using try_get_with here ensures that if we concurrently request the same artifact version
        // twice, only one lookup will be made.
        let result = self
            .cache
            .try_get_with(version.clone(), async move {
                match storage.artifact_get(version).await {
                    Ok(bytes) => Ok(Entry::Data(bytes)),
                    Err(StorageError::NotFound(error)) => {
                        // we save the error, but wrap it in a CachedError, to communicate to
                        // the caller that this error may have been cached.
                        Ok(Entry::Missing(Arc::new(CachedError(error))))
                    }
                    Err(error) => Err(error),
                }
            })
            .await;

        // depending on what the entry is, we construct the right response.
        match result {
            Ok(Entry::Data(bytes)) => Ok(bytes),
            Ok(Entry::Missing(error)) => Err(StorageError::NotFound(error)),
            Err(error) => Err((*error).clone()),
        }
    }
}