Commit e6ffde7d authored by Corentin Noël's avatar Corentin Noël
Browse files

Add running_jobs_seconds gauge



Reports the current running time in seconds the jobs have been processing.
Signed-off-by: Corentin Noël's avatarCorentin Noël <corentin.noel@collabora.com>
parent aeec46b4
......@@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
anyhow = "1"
chrono = "0.4"
structopt = "0.3"
serde = { version = "1", features = [ "derive"] }
serde_yaml = "0.8"
......
......@@ -27,6 +27,13 @@ pub struct Job {
pub priority: i64,
}
#[derive(Debug, Clone)]
pub struct RunningJob {
pub requested_device_type: String,
pub duration: chrono::Duration,
pub actual_device: String,
}
#[derive(Debug)]
struct CacheInner {
update: Option<Instant>,
......@@ -138,4 +145,22 @@ impl LavaCache {
Ok(jobs)
}
pub async fn running_jobs(&self) -> Result<Vec<RunningJob>> {
let mut jobs = Vec::new();
// We are using an artificially high limit here to reduce the number of requests.
let mut lj = self.lava.jobs().limit(2000).state(job::State::Running).query();
while let Some(j) = lj.try_next().await? {
if let Some(start_time) = j.start_time {
jobs.push(RunningJob {
requested_device_type: j.requested_device_type.clone(),
duration: chrono::Utc::now() - start_time,
actual_device: j.actual_device.unwrap_or("unknown_device".to_string()),
})
}
}
Ok(jobs)
}
}
......@@ -11,6 +11,7 @@ pub struct Metrics {
tags: GaugeVec,
workers: GaugeVec,
queue_size: GaugeVec,
running_job_seconds: GaugeVec,
}
impl warp::reject::Reject for AnyhowRejection {}
......@@ -37,12 +38,17 @@ impl Metrics {
let queue_size = register_gauge_vec!(opts, &["device_type", "submitter", "priority"])
.context("Failed to register device queue gauges")?;
let opts = opts!("lava_running_job_seconds", "Time in seconds of the current running jobs");
let running_job_seconds = register_gauge_vec!(opts, &["device_type", "actual_device"])
.context("Failed to register running jobs gauges")?;
Ok(Metrics {
lava,
devices,
tags,
workers,
queue_size,
running_job_seconds,
})
}
......@@ -100,6 +106,17 @@ impl Metrics {
.inc();
}
self.running_job_seconds.reset();
let jobs = self.lava.running_jobs().await?;
for j in jobs {
self.running_job_seconds
.with_label_values(&[
&j.requested_device_type,
&j.actual_device,
])
.set(j.duration.num_seconds() as f64);
}
Ok(())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment