Commit 197b40e7 authored by Corentin Noël's avatar Corentin Noël
Browse files

Add support for jobs



This allows to get the all the jobs from the LAVA instance but also add an API
to filter them out by state so that we can easily query for instance queued jobs
or running ones.
Signed-off-by: Corentin Noël's avatarCorentin Noël <corentin.noel@collabora.com>
parent 0370bb73
......@@ -8,6 +8,7 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
serde = { version = "^1.0.97", features = ["derive"] }
serde_json = "^1"
......
use anyhow::Error;
use lava_api::device;
use lava_api::job;
use lava_api::worker::{self, Worker};
use lava_api::Lava;
use structopt::StructOpt;
......@@ -70,5 +71,22 @@ async fn main() -> Result<(), Error> {
println!(" {} {}", worker_to_emoji(&w), w.hostname);
}
println!("\nQueued Jobs:");
let mut jobs = l.jobs().state(job::State::Submitted).query();
let mut num = 10;
while let Some(w) = jobs.try_next().await? {
println!(" 💤️ [{}] {}", w.id, w.description);
num = num - 1;
if num == 0 {
match jobs.reported_items() {
Some(n) => println!("\n…and {} more jobs", n-10),
None => println!("\n…and an unknown amount of jobs")
};
break;
}
}
Ok(())
}
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::stream::{self, StreamExt, Stream};
use futures::FutureExt;
use serde::Deserialize;
use std::convert::TryFrom;
use std::pin::Pin;
use std::task::{Context, Poll};
use thiserror::Error;
use std::fmt;
use crate::paginator::{PaginationError, Paginator};
use crate::tag::Tag;
use crate::Lava;
#[derive(Copy, Deserialize, Clone, Debug, PartialEq)]
#[serde(try_from = "&str")]
pub enum State {
Submitted,
Scheduling,
Scheduled,
Running,
Canceling,
Finished,
}
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Submitted => write!(f, "Submitted"),
State::Scheduling => write!(f, "Scheduling"),
State::Scheduled => write!(f, "Scheduled"),
State::Running => write!(f, "Running"),
State::Canceling => write!(f, "Canceling"),
State::Finished => write!(f, "Finished"),
}
}
}
#[derive(Clone, Debug, Error)]
#[error("Failed to convert into State")]
pub struct TryFromStateError {}
impl TryFrom<&str> for State {
type Error = TryFromStateError;
fn try_from(v: &str) -> Result<Self, Self::Error> {
match v {
"Submitted" => Ok(State::Submitted),
"Scheduling" => Ok(State::Scheduling),
"Scheduled" => Ok(State::Scheduled),
"Running" => Ok(State::Running),
"Canceling" => Ok(State::Canceling),
"Finished" => Ok(State::Finished),
_ => Err(TryFromStateError {}),
}
}
}
#[derive(Copy, Deserialize, Clone, Debug, PartialEq)]
#[serde(try_from = "&str")]
pub enum Health {
Unknown,
Complete,
Incomplete,
Canceled,
}
#[derive(Clone, Debug, Error)]
#[error("Failed to convert into Health")]
pub struct TryFromHealthError {}
impl TryFrom<&str> for Health {
type Error = TryFromHealthError;
fn try_from(v: &str) -> Result<Self, Self::Error> {
match v {
"Unknown" => Ok(Health::Unknown),
"Complete" => Ok(Health::Complete),
"Incomplete" => Ok(Health::Incomplete),
"Canceled" => Ok(Health::Canceled),
_ => Err(TryFromHealthError {}),
}
}
}
#[derive(Clone, Deserialize, Debug)]
struct LavaJob {
id: i64,
submitter: String,
viewing_groups: Vec<String>,
description: String,
health_check: bool,
requested_device_type: String,
tags: Vec<u32>,
actual_device: Option<String>,
submit_time: DateTime<Utc>,
start_time: Option<DateTime<Utc>>,
end_time: Option<DateTime<Utc>>,
state: State,
health: Health,
priority: i64,
definition: String,
original_definition: String,
multinode_definition: String,
failure_tags: Vec<u32>,
failure_comment: Option<String>,
}
#[derive(Clone, Debug)]
pub struct Job {
pub id: i64,
pub submitter: String,
pub viewing_groups: Vec<String>,
pub description: String,
pub health_check: bool,
pub requested_device_type: String,
pub tags: Vec<Tag>,
pub actual_device: Option<String>,
pub submit_time: DateTime<Utc>,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub state: State,
pub health: Health,
pub priority: i64,
pub definition: String,
pub original_definition: String,
pub multinode_definition: String,
pub failure_tags: Vec<Tag>,
pub failure_comment: Option<String>,
}
enum PagingState<'a> {
Paging,
Transforming(BoxFuture<'a, Job>),
}
pub struct Jobs<'a> {
lava: &'a Lava,
paginator: Paginator<LavaJob>,
state: PagingState<'a>,
}
impl<'a> Jobs<'a> {
pub fn reported_items(&self) -> Option<u32> {
self.paginator.reported_items ()
}
}
pub struct JobsBuilder<'a> {
lava: &'a Lava,
state: Option<State>
}
impl<'a> JobsBuilder<'a> {
pub fn new(lava: &'a Lava) -> Self {
Self {
lava,
state: None,
}
}
pub fn state(mut self, state: State) -> Self {
self.state = Some(state);
self
}
pub fn query(self) -> Jobs<'a> {
let mut query = String::from("jobs/?ordering=id");
match self.state {
Some(state) => {
query.push_str(";state=");
query.push_str(&state.to_string());
},
_ => {}
};
let paginator = Paginator::new(
self.lava.client.clone(),
&self.lava.base,
&query,
);
Jobs {
lava: self.lava,
paginator,
state: PagingState::Paging,
}
}
}
async fn transform_job(job: LavaJob, lava: &Lava) -> Job {
let t = stream::iter(job.tags.iter());
let tags = t
.filter_map(|i| async move { lava.tag(*i).await })
.collect()
.await;
let t = stream::iter(job.failure_tags.iter());
let failure_tags = t
.filter_map(|i| async move { lava.tag(*i).await })
.collect()
.await;
Job {
id: job.id,
submitter: job.submitter,
viewing_groups: job.viewing_groups,
description: job.description,
health_check: job.health_check,
requested_device_type: job.requested_device_type,
tags,
actual_device: job.actual_device,
submit_time: job.submit_time,
start_time: job.start_time,
end_time: job.end_time,
state: job.state,
health: job.health,
priority: job.priority,
definition: job.definition,
original_definition: job.original_definition,
multinode_definition: job.multinode_definition,
failure_tags,
failure_comment: job.failure_comment,
}
}
impl<'a> Stream for Jobs<'a> {
type Item = Result<Job, PaginationError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
loop {
return match &mut me.state {
PagingState::Paging => {
let p = Pin::new(&mut me.paginator);
match p.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(d))) => {
me.state = PagingState::Transforming(transform_job(d, me.lava).boxed());
continue;
}
Poll::Pending => Poll::Pending,
}
}
PagingState::Transforming(fut) => match fut.as_mut().poll(cx) {
Poll::Ready(d) => {
me.state = PagingState::Paging;
Poll::Ready(Some(Ok(d)))
}
Poll::Pending => Poll::Pending,
},
};
}
}
}
......@@ -2,6 +2,7 @@ pub mod device;
mod paginator;
pub mod tag;
pub mod worker;
pub mod job;
use futures::stream::TryStreamExt;
use log::debug;
......@@ -12,6 +13,7 @@ use tokio::sync::RwLock;
use url::Url;
use device::Devices;
use job::JobsBuilder;
use paginator::{PaginationError, Paginator};
use tag::Tag;
use thiserror::Error;
......@@ -93,6 +95,10 @@ impl Lava {
Devices::new(self)
}
pub fn jobs(&self) -> JobsBuilder {
JobsBuilder::new(self)
}
pub fn workers(&self) -> Paginator<Worker> {
Paginator::new(self.client.clone(), &self.base, "workers/")
}
......
......@@ -27,7 +27,7 @@ pub enum PaginationError {
#[derive(Deserialize, Debug)]
struct PaginatedReply<T> {
count: i32,
count: u32,
next: Option<String>,
results: VecDeque<T>,
}
......@@ -41,6 +41,7 @@ enum State<T> {
pub struct Paginator<T> {
client: Client,
next: State<T>,
count: Option<u32>,
}
impl<T> Paginator<T>
......@@ -56,7 +57,7 @@ where
.boxed(),
);
Paginator { client, next }
Paginator { client, next, count: None }
}
async fn get(client: Client, uri: Url) -> Result<PaginatedReply<T>, PaginationError>
......@@ -102,6 +103,7 @@ where
fn next_data(&mut self) -> Result<Option<T>, PaginationError> {
if let State::Data(d) = &mut self.next {
self.count = Some(d.count);
if let Some(data) = d.results.pop_front() {
return Ok(Some(data));
}
......@@ -119,6 +121,10 @@ where
}
Ok(None)
}
pub fn reported_items(&self) -> Option<u32> {
self.count
}
}
impl<T> Stream for Paginator<T>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment