perf: replace RwLock with ArcSwap

ArcSwap is lock-free and optimized for our use-case of lots of
concurrent reads and infrequent updates.

So it is pretty close to a shared-nothing architecture.

Some small changes were necessary, but its mostly a drop-in
replacement for RwLock.
This commit is contained in:
danda 2024-05-27 17:24:01 -07:00
parent 71cf752b41
commit c2fe657e04
13 changed files with 47 additions and 21 deletions

7
Cargo.lock generated
View File

@ -163,6 +163,12 @@ dependencies = [
"derive_arbitrary", "derive_arbitrary",
] ]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]] [[package]]
name = "arrayref" name = "arrayref"
version = "0.3.7" version = "0.3.7"
@ -1934,6 +1940,7 @@ name = "neptune-explorer"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap",
"axum 0.7.5", "axum 0.7.5",
"boilerplate", "boilerplate",
"chrono", "chrono",

View File

@ -31,6 +31,7 @@ chrono = "0.4.34"
# only should be used inside main.rs, for the binary. # only should be used inside main.rs, for the binary.
anyhow = "1.0.86" anyhow = "1.0.86"
arc-swap = "1.7.1"
[patch.crates-io] [patch.crates-io]
# 694f27daf78aade0ed0dc07e3babaab036cd5572 is tip of branch: master as of 2024-04-30 # 694f27daf78aade0ed0dc07e3babaab036cd5572 is tip of branch: master as of 2024-04-30

View File

@ -15,7 +15,7 @@ pub async fn send(
subject: &str, subject: &str,
body: String, body: String,
) -> std::result::Result<bool, anyhow::Error> { ) -> std::result::Result<bool, anyhow::Error> {
match state.read().await.config.alert_config() { match state.load().config.alert_config() {
None => { None => {
warn!("Alert emails disabled. alert not sent. consider confiuring smtp parameters. subject: {subject}"); warn!("Alert emails disabled. alert not sent. consider confiuring smtp parameters. subject: {subject}");
Ok(false) Ok(false)

View File

@ -24,7 +24,7 @@ pub async fn block_page(
header: HeaderHtml<'a>, header: HeaderHtml<'a>,
block_info: BlockInfo, block_info: BlockInfo,
} }
let state = &*state_rw.read().await; let state = &state_rw.load();
let Path(block_selector) = let Path(block_selector) =
user_input_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?; user_input_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?;

View File

@ -76,7 +76,7 @@ pub async fn redirect_query_string_to_path(
RawQuery(raw_query_option): RawQuery, RawQuery(raw_query_option): RawQuery,
State(state_rw): State<Arc<AppState>>, State(state_rw): State<Arc<AppState>>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
let state = &*state_rw.read().await; let state = &state_rw.load();
let not_found = || not_found_html_response(state, None); let not_found = || not_found_html_response(state, None);

View File

@ -18,7 +18,7 @@ pub async fn root(State(state_rw): State<Arc<AppState>>) -> Result<Html<String>,
state: &'a AppStateInner, state: &'a AppStateInner,
} }
let state = &*state_rw.read().await; let state = &state_rw.load();
let tip_height = state let tip_height = state
.rpc_client .rpc_client

View File

@ -25,7 +25,7 @@ pub async fn utxo_page(
digest: Digest, digest: Digest,
} }
let state = &*state_rw.read().await; let state = &state_rw.load();
let Path(index) = let Path(index) =
index_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?; index_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?;

View File

@ -28,7 +28,7 @@ async fn main() -> Result<(), anyhow::Error> {
let routes = setup_routes(app_state.clone()); let routes = setup_routes(app_state.clone());
let port = app_state.read().await.config.listen_port; let port = app_state.load().config.listen_port;
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")) let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}"))
.await .await
.with_context(|| format!("Failed to bind to port {port}"))?; .with_context(|| format!("Failed to bind to port {port}"))?;

View File

@ -1,13 +1,13 @@
use crate::model::config::Config; use crate::model::config::Config;
use crate::neptune_rpc; use crate::neptune_rpc;
use anyhow::Context; use anyhow::Context;
use arc_swap::ArcSwap;
use clap::Parser; use clap::Parser;
use neptune_core::config_models::network::Network; use neptune_core::config_models::network::Network;
use neptune_core::models::blockchain::block::block_selector::BlockSelector; use neptune_core::models::blockchain::block::block_selector::BlockSelector;
use neptune_core::prelude::twenty_first::math::digest::Digest; use neptune_core::prelude::twenty_first::math::digest::Digest;
use neptune_core::rpc_server::RPCClient; use neptune_core::rpc_server::RPCClient;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
pub struct AppStateInner { pub struct AppStateInner {
pub network: Network, pub network: Network,
@ -17,10 +17,10 @@ pub struct AppStateInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct AppState(Arc<RwLock<AppStateInner>>); pub struct AppState(Arc<ArcSwap<AppStateInner>>);
impl std::ops::Deref for AppState { impl std::ops::Deref for AppState {
type Target = Arc<RwLock<AppStateInner>>; type Target = Arc<ArcSwap<AppStateInner>>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0
@ -31,7 +31,7 @@ impl From<(Network, Config, RPCClient, Digest)> for AppState {
fn from( fn from(
(network, config, rpc_client, genesis_digest): (Network, Config, RPCClient, Digest), (network, config, rpc_client, genesis_digest): (Network, Config, RPCClient, Digest),
) -> Self { ) -> Self {
Self(Arc::new(RwLock::new(AppStateInner { Self(Arc::new(ArcSwap::from_pointee(AppStateInner {
network, network,
config, config,
rpc_client, rpc_client,
@ -62,4 +62,27 @@ impl AppState {
genesis_digest, genesis_digest,
))) )))
} }
/// Sets the rpc_client
///
/// This method exists because it is sometimes necessary
/// to re-establish connection to the neptune RPC server.
///
/// This is achieved via ArcSwap which is faster than
/// RwLock for our use-case that is heavy reads and few
/// if any mutations. ArcSwap is effectively lock-free.
///
/// Note that this method takes &self, so interior
/// mutability occurs.
pub fn set_rpc_client(&self, rpc_client: RPCClient) {
let inner = self.0.load();
let new_inner = AppStateInner {
rpc_client,
network: inner.network,
config: inner.config.clone(),
genesis_digest: inner.genesis_digest,
};
self.0.store(Arc::new(new_inner));
}
} }

View File

@ -42,7 +42,7 @@ pub async fn watchdog(app_state: AppState) {
let app_started = chrono::offset::Utc::now(); let app_started = chrono::offset::Utc::now();
let mut was_connected = true; let mut was_connected = true;
let mut since = chrono::offset::Utc::now(); let mut since = chrono::offset::Utc::now();
let watchdog_secs = app_state.read().await.config.neptune_rpc_watchdog_secs; let watchdog_secs = app_state.load().config.neptune_rpc_watchdog_secs;
debug!("neptune-core rpc watchdog started"); debug!("neptune-core rpc watchdog started");
@ -50,8 +50,7 @@ pub async fn watchdog(app_state: AppState) {
tokio::time::sleep(tokio::time::Duration::from_secs(watchdog_secs)).await; tokio::time::sleep(tokio::time::Duration::from_secs(watchdog_secs)).await;
let result = app_state let result = app_state
.read() .load()
.await
.rpc_client .rpc_client
.network(context::current()) .network(context::current())
.await; .await;
@ -94,8 +93,7 @@ pub async fn watchdog(app_state: AppState) {
if !now_connected { if !now_connected {
if let Ok(c) = gen_rpc_client().await { if let Ok(c) = gen_rpc_client().await {
let mut state = app_state.write().await; app_state.set_rpc_client(c);
state.rpc_client = c;
} }
} }
} }

View File

@ -16,8 +16,7 @@ pub async fn block_digest(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
) -> Result<Json<Digest>, impl IntoResponse> { ) -> Result<Json<Digest>, impl IntoResponse> {
match state match state
.read() .load()
.await
.rpc_client .rpc_client
.block_digest(context::current(), selector.into()) .block_digest(context::current(), selector.into())
.await .await

View File

@ -16,8 +16,7 @@ pub async fn block_info(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
) -> Result<Json<BlockInfo>, Response> { ) -> Result<Json<BlockInfo>, Response> {
let block_info = state let block_info = state
.read() .load()
.await
.rpc_client .rpc_client
.block_info(context::current(), selector.into()) .block_info(context::current(), selector.into())
.await .await

View File

@ -17,8 +17,7 @@ pub async fn utxo_digest(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
) -> Result<Json<Digest>, impl IntoResponse> { ) -> Result<Json<Digest>, impl IntoResponse> {
match state match state
.read() .load()
.await
.rpc_client .rpc_client
.utxo_digest(context::current(), index) .utxo_digest(context::current(), index)
.await .await