1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#[allow(unused_imports)]
use serde_json;
use serde_json::value;
use serde_json::Value;
use chrono::offset::utc::UTC;
#[allow(dead_code)]
fn int_to_level(level: u64) -> String {
match level {
10 => "trace".to_string(),
20 => "debug".to_string(),
30 => "info".to_string(),
40 => "warn".to_string(),
50 => "error".to_string(),
60 => "fatal".to_string(),
_ => format!("Unknown level {}", level)
}
}
#[allow(dead_code)]
pub fn transform(input_value: &mut Value) -> Value {
let mut input = input_value.as_object_mut().unwrap();
if input.contains_key("time") {
let time = input.get("time").unwrap().clone();
input.insert("@timestamp".to_string(), time);
input.remove("time");
} else {
let tm = UTC::now();
let format_prefix = "%Y-%m-%dT%H:%M:%S.%f";
let format_suffix = "%Z";
let mut timestamp = tm.format(format_prefix.as_ref()).to_string();
timestamp.truncate(23);
let timestamp_suffix = tm.format(format_suffix.as_ref()).to_string();
timestamp.push_str(×tamp_suffix);
input.insert("@timestamp".to_string(), value::to_value(×tamp));
}
if input.contains_key("level") {
let level = input.get("level").unwrap().as_u64().unwrap();
input.insert("level".to_string(), value::to_value(&int_to_level(level)));
}
if input.contains_key("msg") {
let message = input.get("msg").unwrap().clone();
input.insert("message".to_string(), message);
input.remove("msg");
}
return value::to_value(input);
}
#[allow(dead_code)]
pub fn time_to_index_name(full_timestamp: &str) -> String {
let mut input = full_timestamp.to_string();
input.truncate(10);
input = input.replace("-", ".");
format!("logstash-{}", input)
}
#[test]
fn it_transform_ok() {
let src = r#"{"level":30, "msg":"this is a test.", "time": "12"}"#;
let mut decode = serde_json::from_str::<Value>(src).unwrap();
let transformed = transform(&mut decode);
let out = serde_json::to_string(&transformed).unwrap();
assert_eq!(out, r#"{"@timestamp":"12","level":"info","message":"this is a test."}"#);
}
#[test]
fn it_prepares_index_name() {
let src = r#"{"time": "2015-05-21T10:11:02.132Z"}"#;
let decode = serde_json::from_str::<Value>(src).unwrap();
match decode.find("time") {
Some(time) => assert_eq!("logstash-2015.05.21", time_to_index_name(time.as_string().unwrap())),
None => assert!(false)
}
}