Skip to content
This repository has been archived by the owner on Aug 2, 2024. It is now read-only.

Commit

Permalink
feat: add prometheus logs for da layer (#1347)
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvsadana authored Jan 5, 2024
1 parent 3ecc442 commit a5d5fdf
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- chore: added prometheus metrics for da layer
- chore: bump celestia rpc crate version
- fix(DA): run the proof first then the state update
- fix: `prove_current_block` is called after `update_state`
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/client/data-availability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ mp-digest-log = { workspace = true, default-features = true }
mp-hashers = { workspace = true, default-features = true }
mp-storage = { workspace = true, default-features = true }

# Prometheus
prometheus-endpoint = { workspace = true }

# Optional
clap = { workspace = true, features = ["derive"], optional = true }

Expand Down
10 changes: 10 additions & 0 deletions crates/client/data-availability/src/avail/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{anyhow, Result};
Expand All @@ -11,9 +12,14 @@ use avail_subxt::primitives::AvailExtrinsicParams;
use avail_subxt::{api as AvailApi, build_client, AvailConfig};
use ethers::types::{I256, U256};
use futures::lock::Mutex;
use futures::stream::iter;
use jsonrpsee::tracing::Instrument;
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;
use subxt::ext::sp_core::sr25519::Pair;
use subxt::OnlineClient;

use crate::da_metrics::DaMetrics;
use crate::utils::get_bytes_from_state_diff;
use crate::{DaClient, DaMode};

Expand Down Expand Up @@ -82,6 +88,10 @@ impl DaClient for AvailClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "avail".into()), ("app_id".into(), self.app_id.0.to_string())].iter().cloned().collect()
}
}

impl AvailClient {
Expand Down
9 changes: 9 additions & 0 deletions crates/client/data-availability/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod config;

use std::collections::HashMap;

use anyhow::Result;
use async_trait::async_trait;
use celestia_rpc::{BlobClient, HeaderClient};
Expand All @@ -8,8 +10,11 @@ use celestia_types::nmt::Namespace;
use celestia_types::{Blob, Result as CelestiaTypesResult};
use ethers::types::{I256, U256};
use jsonrpsee::http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder};
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;
use reqwest::header;

use crate::da_metrics::DaMetrics;
use crate::{DaClient, DaMode};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -46,6 +51,10 @@ impl DaClient for CelestiaClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "celesia".into())].iter().cloned().collect()
}
}

impl CelestiaClient {
Expand Down
29 changes: 29 additions & 0 deletions crates/client/data-availability/src/da_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use prometheus_endpoint::prometheus::core::AtomicU64;
use prometheus_endpoint::{register, Gauge, Histogram, HistogramOpts, Opts, PrometheusError, Registry};

#[derive(Clone, Debug)]
pub struct DaMetrics {
pub state_updates: Histogram,
pub state_proofs: Histogram,
}

impl DaMetrics {
pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
state_updates: register(
Histogram::with_opts(HistogramOpts::new(
"madara_da_state_updates",
"Histogram of time taken for state updates",
))?,
registry,
)?,
state_proofs: register(
Histogram::with_opts(HistogramOpts::new(
"madara_da_state_proofs",
"Histogram of time taken for state proofs",
))?,
registry,
)?,
})
}
}
8 changes: 8 additions & 0 deletions crates/client/data-availability/src/ethereum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
Expand All @@ -8,7 +9,10 @@ use ethers::prelude::{abigen, SignerMiddleware};
use ethers::providers::{Http, Provider};
use ethers::signers::{LocalWallet, Signer};
use ethers::types::{Address, I256, U256};
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;

use crate::da_metrics::DaMetrics;
use crate::utils::is_valid_http_endpoint;
use crate::{DaClient, DaMode};

Expand Down Expand Up @@ -59,6 +63,10 @@ impl DaClient for EthereumClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "ethereum".into())].iter().cloned().collect()
}
}

impl TryFrom<config::EthereumConfig> for EthereumClient {
Expand Down
41 changes: 41 additions & 0 deletions crates/client/data-availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ pub mod ethereum;
mod sharp;
pub mod utils;

mod da_metrics;

use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand All @@ -14,12 +18,16 @@ use futures::channel::mpsc;
use futures::StreamExt;
use mc_commitment_state_diff::BlockDAData;
use mp_hashers::HasherT;
use prometheus_endpoint::prometheus::core::AtomicU64;
use prometheus_endpoint::{register, Gauge, Opts, Registry as PrometheusRegistry};
use serde::{Deserialize, Serialize};
use sp_runtime::traits::Block as BlockT;
use starknet_api::block::BlockHash;
use starknet_api::state::ThinStateDiff;
use utils::state_diff_to_calldata;

use crate::da_metrics::DaMetrics;

pub struct DataAvailabilityWorker<B, H>(PhantomData<(B, H)>);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -68,6 +76,7 @@ pub trait DaClient: Send + Sync {
fn get_mode(&self) -> DaMode;
async fn last_published_state(&self) -> Result<I256>;
async fn publish_state_diff(&self, state_diff: Vec<U256>) -> Result<()>;
fn get_da_metric_labels(&self) -> HashMap<String, String>;
}

/// The client worker for DA related tasks
Expand All @@ -83,29 +92,61 @@ where
{
pub async fn prove_current_block(
da_client: Arc<dyn DaClient + Send + Sync>,
prometheus: Option<PrometheusRegistry>,
mut state_diffs_rx: mpsc::Receiver<BlockDAData>,
madara_backend: Arc<mc_db::Backend<B>>,
) {
let da_metrics = prometheus.as_ref().and_then(|registry| DaMetrics::register(registry).ok());
if let Some(registry) = prometheus.as_ref() {
let gauge = Gauge::<AtomicU64>::with_opts(
Opts::new("madara_da_layer_info", "Information about the data availability layer used")
.const_labels(da_client.get_da_metric_labels()),
);
match gauge {
Ok(gauge) => match register(gauge, registry) {
Ok(_) => (),
Err(e) => {
log::error!("failed to register gauge for da layer info metrics: {e}");
}
},
Err(e) => {
log::error!("failed to create gauge for da layer info metrics: {e}");
}
}
}
while let Some(BlockDAData(starknet_block_hash, csd, num_addr_accessed)) = state_diffs_rx.next().await {
log::info!(
"received state diff for block {starknet_block_hash}: {csd:?}.{num_addr_accessed} addresses accessed."
);

let da_metrics = da_metrics.clone();
let da_client = da_client.clone();
let madara_backend = madara_backend.clone();
tokio::spawn(async move {
let prove_state_start = time::Instant::now();
match prove(da_client.get_mode(), starknet_block_hash, &csd, num_addr_accessed, madara_backend.clone())
.await
{
Err(err) => log::error!("proving error: {err}"),
Ok(()) => {}
}
let prove_state_end = time::Instant::now();

match update_state::<B, H>(madara_backend, da_client, starknet_block_hash, csd, num_addr_accessed).await
{
Err(err) => log::error!("state publishing error: {err}"),
Ok(()) => {}
};
let update_state_end = time::Instant::now();

if let Some(da_metrics) = da_metrics {
da_metrics
.state_proofs
.observe(prove_state_end.saturating_duration_since(prove_state_start).as_secs_f64());
da_metrics
.state_updates
.observe(update_state_end.saturating_duration_since(prove_state_end).as_secs_f64());
}
});
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ pub fn new_full(
Some(MADARA_TASK_GROUP),
DataAvailabilityWorker::<_, StarknetHasher>::prove_current_block(
da_client,
prometheus_registry.clone(),
commitment_state_diff_rx,
madara_backend.clone(),
),
Expand Down

0 comments on commit a5d5fdf

Please sign in to comment.