notifications.rs 9.5 KB


  1. use rocket::Route;
  2. use rocket_contrib::json::Json;
  3. use serde_json::Value as JsonValue;
  4. use crate::api::JsonResult;
  5. use crate::auth::Headers;
  6. use crate::db::DbConn;
  7. use crate::CONFIG;
  8. pub fn routes() -> Vec<Route> {
  9. routes![negotiate, websockets_err]
  10. }
  11. #[get("/hub")]
  12. fn websockets_err() -> JsonResult {
  13. err!("'/notifications/hub' should be proxied to the websocket server or notifications won't work. Go to the README for more info.")
  14. }
  15. #[post("/hub/negotiate")]
  16. fn negotiate(_headers: Headers, _conn: DbConn) -> JsonResult {
  17. use crate::crypto;
  18. use data_encoding::BASE64URL;
  19. let conn_id = BASE64URL.encode(&crypto::get_random(vec![0u8; 16]));
  20. let mut available_transports: Vec<JsonValue> = Vec::new();
  21. if CONFIG.websocket_enabled {
  22. available_transports.push(json!({"transport":"WebSockets", "transferFormats":["Text","Binary"]}));
  23. }
  24. // TODO: Implement transports
  25. // Rocket WS support: https://github.com/SergioBenitez/Rocket/issues/90
  26. // Rocket SSE support: https://github.com/SergioBenitez/Rocket/issues/33
  27. // {"transport":"ServerSentEvents", "transferFormats":["Text"]},
  28. // {"transport":"LongPolling", "transferFormats":["Text","Binary"]}
  29. Ok(Json(json!({
  30. "connectionId": conn_id,
  31. "availableTransports": available_transports
  32. })))
  33. }
  34. //
  35. // Websockets server
  36. //
  37. use std::sync::Arc;
  38. use std::thread;
  39. use ws::{self, util::Token, Factory, Handler, Handshake, Message, Sender, WebSocket};
  40. use chashmap::CHashMap;
  41. use chrono::NaiveDateTime;
  42. use serde_json::from_str;
  43. use crate::db::models::{Cipher, Folder, User};
  44. use rmpv::Value;
  45. fn serialize(val: Value) -> Vec<u8> {
  46. use rmpv::encode::write_value;
  47. let mut buf = Vec::new();
  48. write_value(&mut buf, &val).expect("Error encoding MsgPack");
  49. // Add size bytes at the start
  50. // Extracted from BinaryMessageFormat.js
  51. let mut size: usize = buf.len();
  52. let mut len_buf: Vec<u8> = Vec::new();
  53. loop {
  54. let mut size_part = size & 0x7f;
  55. size >>= 7;
  56. if size > 0 {
  57. size_part |= 0x80;
  58. }
  59. len_buf.push(size_part as u8);
  60. if size == 0 {
  61. break;
  62. }
  63. }
  64. len_buf.append(&mut buf);
  65. len_buf
  66. }
  67. fn serialize_date(date: NaiveDateTime) -> Value {
  68. let seconds: i64 = date.timestamp();
  69. let nanos: i64 = date.timestamp_subsec_nanos() as i64;
  70. let timestamp = nanos << 34 | seconds;
  71. use byteorder::{BigEndian, WriteBytesExt};
  72. let mut bs = [0u8; 8];
  73. bs.as_mut().write_i64::<BigEndian>(timestamp).expect("Unable to write");
  74. // -1 is Timestamp
  75. // https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type
  76. Value::Ext(-1, bs.to_vec())
  77. }
  78. fn convert_option<T: Into<Value>>(option: Option<T>) -> Value {
  79. match option {
  80. Some(a) => a.into(),
  81. None => Value::Nil,
  82. }
  83. }
  84. // Server WebSocket handler
  85. pub struct WSHandler {
  86. out: Sender,
  87. user_uuid: Option<String>,
  88. users: WebSocketUsers,
  89. }
  90. const RECORD_SEPARATOR: u8 = 0x1e;
  91. const INITIAL_RESPONSE: [u8; 3] = [0x7b, 0x7d, RECORD_SEPARATOR]; // {, }, <RS>
  92. #[derive(Deserialize)]
  93. struct InitialMessage {
  94. protocol: String,
  95. version: i32,
  96. }
  97. const PING_MS: u64 = 15_000;
  98. const PING: Token = Token(1);
  99. impl Handler for WSHandler {
  100. fn on_open(&mut self, hs: Handshake) -> ws::Result<()> {
  101. // TODO: Improve this split
  102. let path = hs.request.resource();
  103. let mut query_split: Vec<_> = path.split('?').nth(1).unwrap().split('&').collect();
  104. query_split.sort();
  105. let access_token = &query_split[0][13..];
  106. let _id = &query_split[1][3..];
  107. // Validate the user
  108. use crate::auth;
  109. let claims = match auth::decode_jwt(access_token) {
  110. Ok(claims) => claims,
  111. Err(_) => return Err(ws::Error::new(ws::ErrorKind::Internal, "Invalid access token provided")),
  112. };
  113. // Assign the user to the handler
  114. let user_uuid = claims.sub;
  115. self.user_uuid = Some(user_uuid.clone());
  116. // Add the current Sender to the user list
  117. let handler_insert = self.out.clone();
  118. let handler_update = self.out.clone();
  119. self.users
  120. .map
  121. .upsert(user_uuid, || vec![handler_insert], |ref mut v| v.push(handler_update));
  122. // Schedule a ping to keep the connection alive
  123. self.out.timeout(PING_MS, PING)
  124. }
  125. fn on_message(&mut self, msg: Message) -> ws::Result<()> {
  126. info!("Server got message '{}'. ", msg);
  127. if let Message::Text(text) = msg.clone() {
  128. let json = &text[..text.len() - 1]; // Remove last char
  129. if let Ok(InitialMessage { protocol, version }) = from_str::<InitialMessage>(json) {
  130. if &protocol == "messagepack" && version == 1 {
  131. return self.out.send(&INITIAL_RESPONSE[..]); // Respond to initial message
  132. }
  133. }
  134. }
  135. // If it's not the initial message, just echo the message
  136. self.out.send(msg)
  137. }
  138. fn on_timeout(&mut self, event: Token) -> ws::Result<()> {
  139. if event == PING {
  140. // send ping
  141. self.out.send(create_ping())?;
  142. // reschedule the timeout
  143. self.out.timeout(PING_MS, PING)
  144. } else {
  145. Err(ws::Error::new(
  146. ws::ErrorKind::Internal,
  147. "Invalid timeout token provided",
  148. ))
  149. }
  150. }
  151. }
  152. struct WSFactory {
  153. pub users: WebSocketUsers,
  154. }
  155. impl WSFactory {
  156. pub fn init() -> Self {
  157. WSFactory {
  158. users: WebSocketUsers {
  159. map: Arc::new(CHashMap::new()),
  160. },
  161. }
  162. }
  163. }
  164. impl Factory for WSFactory {
  165. type Handler = WSHandler;
  166. fn connection_made(&mut self, out: Sender) -> Self::Handler {
  167. WSHandler {
  168. out,
  169. user_uuid: None,
  170. users: self.users.clone(),
  171. }
  172. }
  173. fn connection_lost(&mut self, handler: Self::Handler) {
  174. // Remove handler
  175. if let Some(user_uuid) = &handler.user_uuid {
  176. if let Some(mut user_conn) = self.users.map.get_mut(user_uuid) {
  177. user_conn.remove_item(&handler.out);
  178. }
  179. }
  180. }
  181. }
  182. #[derive(Clone)]
  183. pub struct WebSocketUsers {
  184. map: Arc<CHashMap<String, Vec<Sender>>>,
  185. }
  186. impl WebSocketUsers {
  187. fn send_update(&self, user_uuid: &String, data: &[u8]) -> ws::Result<()> {
  188. if let Some(user) = self.map.get(user_uuid) {
  189. for sender in user.iter() {
  190. sender.send(data)?;
  191. }
  192. }
  193. Ok(())
  194. }
  195. // NOTE: The last modified date needs to be updated before calling these methods
  196. #[allow(dead_code)]
  197. pub fn send_user_update(&self, ut: UpdateType, user: &User) {
  198. let data = create_update(
  199. vec![
  200. ("UserId".into(), user.uuid.clone().into()),
  201. ("Date".into(), serialize_date(user.updated_at)),
  202. ],
  203. ut,
  204. );
  205. self.send_update(&user.uuid.clone(), &data).ok();
  206. }
  207. pub fn send_folder_update(&self, ut: UpdateType, folder: &Folder) {
  208. let data = create_update(
  209. vec![
  210. ("Id".into(), folder.uuid.clone().into()),
  211. ("UserId".into(), folder.user_uuid.clone().into()),
  212. ("RevisionDate".into(), serialize_date(folder.updated_at)),
  213. ],
  214. ut,
  215. );
  216. self.send_update(&folder.user_uuid, &data).ok();
  217. }
  218. pub fn send_cipher_update(&self, ut: UpdateType, cipher: &Cipher, user_uuids: &[String]) {
  219. let user_uuid = convert_option(cipher.user_uuid.clone());
  220. let org_uuid = convert_option(cipher.organization_uuid.clone());
  221. let data = create_update(
  222. vec![
  223. ("Id".into(), cipher.uuid.clone().into()),
  224. ("UserId".into(), user_uuid),
  225. ("OrganizationId".into(), org_uuid),
  226. ("CollectionIds".into(), Value::Nil),
  227. ("RevisionDate".into(), serialize_date(cipher.updated_at)),
  228. ],
  229. ut,
  230. );
  231. for uuid in user_uuids {
  232. self.send_update(&uuid, &data).ok();
  233. }
  234. }
  235. }
  236. /* Message Structure
  237. [
  238. 1, // MessageType.Invocation
  239. {}, // Headers
  240. null, // InvocationId
  241. "ReceiveMessage", // Target
  242. [ // Arguments
  243. {
  244. "ContextId": "app_id",
  245. "Type": ut as i32,
  246. "Payload": {}
  247. }
  248. ]
  249. ]
  250. */
  251. fn create_update(payload: Vec<(Value, Value)>, ut: UpdateType) -> Vec<u8> {
  252. use rmpv::Value as V;
  253. let value = V::Array(vec![
  254. 1.into(),
  255. V::Array(vec![]),
  256. V::Nil,
  257. "ReceiveMessage".into(),
  258. V::Array(vec![V::Map(vec![
  259. ("ContextId".into(), "app_id".into()),
  260. ("Type".into(), (ut as i32).into()),
  261. ("Payload".into(), payload.into()),
  262. ])]),
  263. ]);
  264. serialize(value)
  265. }
  266. fn create_ping() -> Vec<u8> {
  267. serialize(Value::Array(vec![6.into()]))
  268. }
  269. #[allow(dead_code)]
  270. pub enum UpdateType {
  271. CipherUpdate = 0,
  272. CipherCreate = 1,
  273. LoginDelete = 2,
  274. FolderDelete = 3,
  275. Ciphers = 4,
  276. Vault = 5,
  277. OrgKeys = 6,
  278. FolderCreate = 7,
  279. FolderUpdate = 8,
  280. CipherDelete = 9,
  281. SyncSettings = 10,
  282. LogOut = 11,
  283. }
  284. use rocket::State;
  285. pub type Notify<'a> = State<'a, WebSocketUsers>;
  286. pub fn start_notification_server() -> WebSocketUsers {
  287. let factory = WSFactory::init();
  288. let users = factory.users.clone();
  289. if CONFIG.websocket_enabled {
  290. thread::spawn(move || {
  291. WebSocket::new(factory).unwrap().listen(&CONFIG.websocket_url).unwrap();
  292. });
  293. }
  294. users
  295. }