1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
extern crate rand as rnd;

use std::collections::HashMap;
use std::sync::mpsc::{Receiver, SyncSender};
use std::thread::sleep;
use std::time::Duration;
use self::rnd::{thread_rng, Rng};

use processor::{InputProcessor, ConfigurableFilter};

struct StringField;
struct UInt32Field;

trait Randomizable {
  fn generate(&self) -> String;
}

impl Randomizable for StringField {
  fn generate(&self) -> String {
    let s:String = thread_rng().gen_ascii_chars().take(10).collect();
    s
  }
}

impl Randomizable for UInt32Field {
  fn generate(&self) -> String {
    format!("{}", thread_rng().gen::<u32>())
  }
}


pub struct Random {
  name: String
}

/// # Random input
///
/// - generate fake input according to column definitions
///
/// ### catapult.conf
///
/// ```
/// input {
///     random {
///         fieldlist = "id:id,content:str"
///         rate = 3
///     }
/// }
/// ```
/// ### Parameters
///
/// - **rate**: Number of messages per second
/// - **fieldList**: comma separated list of field name : type
///
/// ### Supported type
///
/// - String for now. All other types use string.


impl Random {
    pub fn new(name: String) -> Random {
    Random{ name: name }
  }
}

fn typeize(f: &str) -> Box<Randomizable> {
  let definition: Vec<&str> = f.split(":").collect();
  match definition[1] {
    "u32" => Box::new(UInt32Field) as Box<Randomizable>,
    _ => Box::new(StringField) as Box<Randomizable>
  }
}

impl ConfigurableFilter for Random {
  fn human_name(&self) -> &str {
    self.name.as_ref()
  }
  fn mandatory_fields(&self) -> Vec<&str> {
    vec!["fieldlist", "rate"]
  }

}

impl InputProcessor for Random {
  fn start(&self, config: &Option<HashMap<String,String>>) -> Receiver<String> {
    self.requires_fields(config, self.mandatory_fields());
    self.invoke(config, Random::handle_func)
  }
  fn handle_func(tx: SyncSender<String>, config: Option<HashMap<String,String>>) {
    let conf = config.unwrap();
    let rate = conf.get("rate").unwrap().clone();

    let sleep_duration: u32 = (1000.0f32 / rate.parse::<f32>().unwrap()) as u32;
    println!("Random input will sleep for {}", sleep_duration);

    let fields: Vec<Box<Randomizable>> = conf.get("fieldlist").unwrap().split(",").map(move |f| typeize(f)).collect();

    loop {
      let duration = Duration::new(sleep_duration as u64, 0);
      sleep(duration);
      let mut l = Vec::new();
      for f in &fields {
        l.push(f.generate());
      }
      let line = l.join("\t");
      match tx.try_send(line.clone()) {
        Ok(()) => {},
        Err(e) => {
          println!("Unable to send line to processor: {}", e);
          println!("{}", line)
        }
      }
    }
  }
}