1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, SyncSender};
use std::thread;
use std::sync::mpsc::sync_channel;
use std::thread::JoinHandle;

pub trait ConfigurableFilter {
  fn human_name(&self) -> &str;
  fn mandatory_fields(&self) -> Vec<&str> {
    vec![]
  }

  fn requires_fields(&self, optional_config: &Option<HashMap<String,String>>, required_fields: Vec<&str>) {
    let mut missing_fields = Vec::new();
    match optional_config {
      &Some(ref config) => {
        for required in required_fields {
          if !config.contains_key(required) {
            missing_fields.push(required);
          }
        }
      },
      &None => {missing_fields.extend(&required_fields)}
    }

    if missing_fields.len() > 0 {
      panic!("Missing fields for \"{}\": {:?}", self.human_name(), missing_fields);
    }
  }
}

pub trait InputProcessor: ConfigurableFilter {
  #[allow(unused_variables)]
  fn start(&self, config: &Option<HashMap<String,String>>) -> Receiver<String> {
    panic!("Not implemented");
  }

  #[allow(unused_variables)]
  fn handle_func(tx: SyncSender<String>, config: Option<HashMap<String,String>>) {
    panic!("Not implemented");
  }

  fn invoke(&self, config: &Option<HashMap<String,String>>,
    handle_func: fn(tx: SyncSender<String>, config: Option<HashMap<String,String>>)) -> Receiver<String>
   {
    let (tx, rx) = sync_channel(10000);
    let conf = config.clone();

    let run_loop = thread::Builder::new().name("run_loop".to_string()).spawn(move || {
      handle_func(tx, conf);
    });

    match run_loop {
      Ok(_) => {
        println!("Started Thread for {}", self.human_name());
        rx
      },
      Err(e) => panic!("Unable to spawn {} input thread: {}", self.human_name(), e)
    }
  }
}

pub trait OutputProcessor: ConfigurableFilter {
  fn start(&self, _rx: Receiver<String>, _config: &Option<HashMap<String,String>>) -> Result<JoinHandle<()>, String> {
    panic!("Not implemented");
  }

  #[allow(unused_variables)]
  fn handle_func(rx: Receiver<String>, config: Option<HashMap<String,String>>) {
    panic!("Not implemented");
  }

  fn invoke(&self, rx: Receiver<String>, config: &Option<HashMap<String,String>>,
    handle_func: fn(rx: Receiver<String>, config: Option<HashMap<String,String>>)) -> Result<JoinHandle<()>, String>
   {
    let conf = config.clone();

    let run_loop = thread::Builder::new().name("run_loop".to_string()).spawn(move || {
      handle_func(rx, conf);
    });

    match run_loop {
      Ok(jh) => Ok(jh),
      Err(e) => Err(format!("Unable to spawn {} output thread: {}", self.human_name(), e))
    }
  }
}