| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 | 
							- use rocket::Route;
 
- use rocket_contrib::json::Json;
 
- use serde_json::Value as JsonValue;
 
- use crate::api::JsonResult;
 
- use crate::auth::Headers;
 
- use crate::db::DbConn;
 
- use crate::CONFIG;
 
- pub fn routes() -> Vec<Route> {
 
-     routes![negotiate, websockets_err]
 
- }
 
- #[get("/hub")]
 
- fn websockets_err() -> JsonResult {
 
-     err!("'/notifications/hub' should be proxied to the websocket server or notifications won't work. Go to the README for more info.")
 
- }
 
- #[post("/hub/negotiate")]
 
- fn negotiate(_headers: Headers, _conn: DbConn) -> JsonResult {
 
-     use crate::crypto;
 
-     use data_encoding::BASE64URL;
 
-     let conn_id = BASE64URL.encode(&crypto::get_random(vec![0u8; 16]));
 
-     let mut available_transports: Vec<JsonValue> = Vec::new();
 
-     if CONFIG.websocket_enabled {
 
-         available_transports.push(json!({"transport":"WebSockets", "transferFormats":["Text","Binary"]}));
 
-     }
 
-     // TODO: Implement transports
 
-     // Rocket WS support: https://github.com/SergioBenitez/Rocket/issues/90
 
-     // Rocket SSE support: https://github.com/SergioBenitez/Rocket/issues/33
 
-     // {"transport":"ServerSentEvents", "transferFormats":["Text"]},
 
-     // {"transport":"LongPolling", "transferFormats":["Text","Binary"]}
 
-     Ok(Json(json!({
 
-         "connectionId": conn_id,
 
-         "availableTransports": available_transports
 
-     })))
 
- }
 
- //
 
- // Websockets server
 
- //
 
- use std::sync::Arc;
 
- use std::thread;
 
- use ws::{self, util::Token, Factory, Handler, Handshake, Message, Sender, WebSocket};
 
- use chashmap::CHashMap;
 
- use chrono::NaiveDateTime;
 
- use serde_json::from_str;
 
- use crate::db::models::{Cipher, Folder, User};
 
- use rmpv::Value;
 
- fn serialize(val: Value) -> Vec<u8> {
 
-     use rmpv::encode::write_value;
 
-     let mut buf = Vec::new();
 
-     write_value(&mut buf, &val).expect("Error encoding MsgPack");
 
-     // Add size bytes at the start
 
-     // Extracted from BinaryMessageFormat.js
 
-     let mut size: usize = buf.len();
 
-     let mut len_buf: Vec<u8> = Vec::new();
 
-     loop {
 
-         let mut size_part = size & 0x7f;
 
-         size >>= 7;
 
-         if size > 0 {
 
-             size_part |= 0x80;
 
-         }
 
-         len_buf.push(size_part as u8);
 
-         if size == 0 {
 
-             break;
 
-         }
 
-     }
 
-     len_buf.append(&mut buf);
 
-     len_buf
 
- }
 
- fn serialize_date(date: NaiveDateTime) -> Value {
 
-     let seconds: i64 = date.timestamp();
 
-     let nanos: i64 = date.timestamp_subsec_nanos() as i64;
 
-     let timestamp = nanos << 34 | seconds;
 
-     use byteorder::{BigEndian, WriteBytesExt};
 
-     let mut bs = [0u8; 8];
 
-     bs.as_mut().write_i64::<BigEndian>(timestamp).expect("Unable to write");
 
-     // -1 is Timestamp
 
-     // https://github.com/msgpack/msgpack/blob/master/spec.md#timestamp-extension-type
 
-     Value::Ext(-1, bs.to_vec())
 
- }
 
- fn convert_option<T: Into<Value>>(option: Option<T>) -> Value {
 
-     match option {
 
-         Some(a) => a.into(),
 
-         None => Value::Nil,
 
-     }
 
- }
 
- // Server WebSocket handler
 
- pub struct WSHandler {
 
-     out: Sender,
 
-     user_uuid: Option<String>,
 
-     users: WebSocketUsers,
 
- }
 
- const RECORD_SEPARATOR: u8 = 0x1e;
 
- const INITIAL_RESPONSE: [u8; 3] = [0x7b, 0x7d, RECORD_SEPARATOR]; // {, }, <RS>
 
- #[derive(Deserialize)]
 
- struct InitialMessage {
 
-     protocol: String,
 
-     version: i32,
 
- }
 
- const PING_MS: u64 = 15_000;
 
- const PING: Token = Token(1);
 
- impl Handler for WSHandler {
 
-     fn on_open(&mut self, hs: Handshake) -> ws::Result<()> {
 
-         // TODO: Improve this split
 
-         let path = hs.request.resource();
 
-         let mut query_split: Vec<_> = path.split('?').nth(1).unwrap().split('&').collect();
 
-         query_split.sort();
 
-         let access_token = &query_split[0][13..];
 
-         let _id = &query_split[1][3..];
 
-         // Validate the user
 
-         use crate::auth;
 
-         let claims = match auth::decode_jwt(access_token) {
 
-             Ok(claims) => claims,
 
-             Err(_) => return Err(ws::Error::new(ws::ErrorKind::Internal, "Invalid access token provided")),
 
-         };
 
-         // Assign the user to the handler
 
-         let user_uuid = claims.sub;
 
-         self.user_uuid = Some(user_uuid.clone());
 
-         // Add the current Sender to the user list
 
-         let handler_insert = self.out.clone();
 
-         let handler_update = self.out.clone();
 
-         self.users
 
-             .map
 
-             .upsert(user_uuid, || vec![handler_insert], |ref mut v| v.push(handler_update));
 
-         // Schedule a ping to keep the connection alive
 
-         self.out.timeout(PING_MS, PING)
 
-     }
 
-     fn on_message(&mut self, msg: Message) -> ws::Result<()> {
 
-         info!("Server got message '{}'. ", msg);
 
-         if let Message::Text(text) = msg.clone() {
 
-             let json = &text[..text.len() - 1]; // Remove last char
 
-             if let Ok(InitialMessage { protocol, version }) = from_str::<InitialMessage>(json) {
 
-                 if &protocol == "messagepack" && version == 1 {
 
-                     return self.out.send(&INITIAL_RESPONSE[..]); // Respond to initial message
 
-                 }
 
-             }
 
-         }
 
-         // If it's not the initial message, just echo the message
 
-         self.out.send(msg)
 
-     }
 
-     fn on_timeout(&mut self, event: Token) -> ws::Result<()> {
 
-         if event == PING {
 
-             // send ping
 
-             self.out.send(create_ping())?;
 
-             // reschedule the timeout
 
-             self.out.timeout(PING_MS, PING)
 
-         } else {
 
-             Err(ws::Error::new(
 
-                 ws::ErrorKind::Internal,
 
-                 "Invalid timeout token provided",
 
-             ))
 
-         }
 
-     }
 
- }
 
- struct WSFactory {
 
-     pub users: WebSocketUsers,
 
- }
 
- impl WSFactory {
 
-     pub fn init() -> Self {
 
-         WSFactory {
 
-             users: WebSocketUsers {
 
-                 map: Arc::new(CHashMap::new()),
 
-             },
 
-         }
 
-     }
 
- }
 
- impl Factory for WSFactory {
 
-     type Handler = WSHandler;
 
-     fn connection_made(&mut self, out: Sender) -> Self::Handler {
 
-         WSHandler {
 
-             out,
 
-             user_uuid: None,
 
-             users: self.users.clone(),
 
-         }
 
-     }
 
-     fn connection_lost(&mut self, handler: Self::Handler) {
 
-         // Remove handler
 
-         if let Some(user_uuid) = &handler.user_uuid {
 
-             if let Some(mut user_conn) = self.users.map.get_mut(user_uuid) {
 
-                 user_conn.remove_item(&handler.out);
 
-             }
 
-         }
 
-     }
 
- }
 
- #[derive(Clone)]
 
- pub struct WebSocketUsers {
 
-     map: Arc<CHashMap<String, Vec<Sender>>>,
 
- }
 
- impl WebSocketUsers {
 
-     fn send_update(&self, user_uuid: &String, data: &[u8]) -> ws::Result<()> {
 
-         if let Some(user) = self.map.get(user_uuid) {
 
-             for sender in user.iter() {
 
-                 sender.send(data)?;
 
-             }
 
-         }
 
-         Ok(())
 
-     }
 
-     // NOTE: The last modified date needs to be updated before calling these methods
 
-     #[allow(dead_code)]
 
-     pub fn send_user_update(&self, ut: UpdateType, user: &User) {
 
-         let data = create_update(
 
-             vec![
 
-                 ("UserId".into(), user.uuid.clone().into()),
 
-                 ("Date".into(), serialize_date(user.updated_at)),
 
-             ],
 
-             ut,
 
-         );
 
-         self.send_update(&user.uuid.clone(), &data).ok();
 
-     }
 
-     pub fn send_folder_update(&self, ut: UpdateType, folder: &Folder) {
 
-         let data = create_update(
 
-             vec![
 
-                 ("Id".into(), folder.uuid.clone().into()),
 
-                 ("UserId".into(), folder.user_uuid.clone().into()),
 
-                 ("RevisionDate".into(), serialize_date(folder.updated_at)),
 
-             ],
 
-             ut,
 
-         );
 
-         self.send_update(&folder.user_uuid, &data).ok();
 
-     }
 
-     pub fn send_cipher_update(&self, ut: UpdateType, cipher: &Cipher, user_uuids: &[String]) {
 
-         let user_uuid = convert_option(cipher.user_uuid.clone());
 
-         let org_uuid = convert_option(cipher.organization_uuid.clone());
 
-         let data = create_update(
 
-             vec![
 
-                 ("Id".into(), cipher.uuid.clone().into()),
 
-                 ("UserId".into(), user_uuid),
 
-                 ("OrganizationId".into(), org_uuid),
 
-                 ("CollectionIds".into(), Value::Nil),
 
-                 ("RevisionDate".into(), serialize_date(cipher.updated_at)),
 
-             ],
 
-             ut,
 
-         );
 
-         for uuid in user_uuids {
 
-             self.send_update(&uuid, &data).ok();
 
-         }
 
-     }
 
- }
 
- /* Message Structure
 
- [
 
-     1, // MessageType.Invocation
 
-     {}, // Headers
 
-     null, // InvocationId
 
-     "ReceiveMessage", // Target
 
-     [ // Arguments
 
-         {
 
-             "ContextId": "app_id",
 
-             "Type": ut as i32,
 
-             "Payload": {}
 
-         }
 
-     ]
 
- ]
 
- */
 
- fn create_update(payload: Vec<(Value, Value)>, ut: UpdateType) -> Vec<u8> {
 
-     use rmpv::Value as V;
 
-     let value = V::Array(vec![
 
-         1.into(),
 
-         V::Array(vec![]),
 
-         V::Nil,
 
-         "ReceiveMessage".into(),
 
-         V::Array(vec![V::Map(vec![
 
-             ("ContextId".into(), "app_id".into()),
 
-             ("Type".into(), (ut as i32).into()),
 
-             ("Payload".into(), payload.into()),
 
-         ])]),
 
-     ]);
 
-     serialize(value)
 
- }
 
- fn create_ping() -> Vec<u8> {
 
-     serialize(Value::Array(vec![6.into()]))
 
- }
 
- #[allow(dead_code)]
 
- pub enum UpdateType {
 
-     CipherUpdate = 0,
 
-     CipherCreate = 1,
 
-     LoginDelete = 2,
 
-     FolderDelete = 3,
 
-     Ciphers = 4,
 
-     Vault = 5,
 
-     OrgKeys = 6,
 
-     FolderCreate = 7,
 
-     FolderUpdate = 8,
 
-     CipherDelete = 9,
 
-     SyncSettings = 10,
 
-     LogOut = 11,
 
- }
 
- use rocket::State;
 
- pub type Notify<'a> = State<'a, WebSocketUsers>;
 
- pub fn start_notification_server() -> WebSocketUsers {
 
-     let factory = WSFactory::init();
 
-     let users = factory.users.clone();
 
-     if CONFIG.websocket_enabled {
 
-         thread::spawn(move || {
 
-             WebSocket::new(factory).unwrap().listen(&CONFIG.websocket_url).unwrap();
 
-         });
 
-     }
 
-     users
 
- }
 
 
  |