1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
extern crate rand as rnd;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, SyncSender};
use std::thread::sleep;
use std::time::Duration;
use self::rnd::{thread_rng, Rng};
use processor::{InputProcessor, ConfigurableFilter};
struct StringField;
struct UInt32Field;
trait Randomizable {
fn generate(&self) -> String;
}
impl Randomizable for StringField {
fn generate(&self) -> String {
let s:String = thread_rng().gen_ascii_chars().take(10).collect();
s
}
}
impl Randomizable for UInt32Field {
fn generate(&self) -> String {
format!("{}", thread_rng().gen::<u32>())
}
}
pub struct Random {
name: String
}
impl Random {
pub fn new(name: String) -> Random {
Random{ name: name }
}
}
fn typeize(f: &str) -> Box<Randomizable> {
let definition: Vec<&str> = f.split(":").collect();
match definition[1] {
"u32" => Box::new(UInt32Field) as Box<Randomizable>,
_ => Box::new(StringField) as Box<Randomizable>
}
}
impl ConfigurableFilter for Random {
fn human_name(&self) -> &str {
self.name.as_ref()
}
fn mandatory_fields(&self) -> Vec<&str> {
vec!["fieldlist", "rate"]
}
}
impl InputProcessor for Random {
fn start(&self, config: &Option<HashMap<String,String>>) -> Receiver<String> {
self.requires_fields(config, self.mandatory_fields());
self.invoke(config, Random::handle_func)
}
fn handle_func(tx: SyncSender<String>, config: Option<HashMap<String,String>>) {
let conf = config.unwrap();
let rate = conf.get("rate").unwrap().clone();
let sleep_duration: u32 = (1000.0f32 / rate.parse::<f32>().unwrap()) as u32;
println!("Random input will sleep for {}", sleep_duration);
let fields: Vec<Box<Randomizable>> = conf.get("fieldlist").unwrap().split(",").map(move |f| typeize(f)).collect();
loop {
let duration = Duration::new(sleep_duration as u64, 0);
sleep(duration);
let mut l = Vec::new();
for f in &fields {
l.push(f.generate());
}
let line = l.join("\t");
match tx.try_send(line.clone()) {
Ok(()) => {},
Err(e) => {
println!("Unable to send line to processor: {}", e);
println!("{}", line)
}
}
}
}
}