stream.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. const _ = require('lodash');
  2. const error = require('../lib/error');
  3. const streamModel = require('../models/stream');
  4. const internalNginx = require('./nginx');
  5. const internalAuditLog = require('./audit-log');
  6. function omissions () {
  7. return ['is_deleted'];
  8. }
  9. const internalStream = {
  10. /**
  11. * @param {Access} access
  12. * @param {Object} data
  13. * @returns {Promise}
  14. */
  15. create: (access, data) => {
  16. return access.can('streams:create', data)
  17. .then((/*access_data*/) => {
  18. // TODO: At this point the existing ports should have been checked
  19. data.owner_user_id = access.token.getUserId(1);
  20. if (typeof data.meta === 'undefined') {
  21. data.meta = {};
  22. }
  23. return streamModel
  24. .query()
  25. .omit(omissions())
  26. .insertAndFetch(data);
  27. })
  28. .then((row) => {
  29. // Configure nginx
  30. return internalNginx.configure(streamModel, 'stream', row)
  31. .then(() => {
  32. return internalStream.get(access, {id: row.id, expand: ['owner']});
  33. });
  34. })
  35. .then((row) => {
  36. // Add to audit log
  37. return internalAuditLog.add(access, {
  38. action: 'created',
  39. object_type: 'stream',
  40. object_id: row.id,
  41. meta: data
  42. })
  43. .then(() => {
  44. return row;
  45. });
  46. });
  47. },
  48. /**
  49. * @param {Access} access
  50. * @param {Object} data
  51. * @param {Number} data.id
  52. * @return {Promise}
  53. */
  54. update: (access, data) => {
  55. return access.can('streams:update', data.id)
  56. .then((/*access_data*/) => {
  57. // TODO: at this point the existing streams should have been checked
  58. return internalStream.get(access, {id: data.id});
  59. })
  60. .then((row) => {
  61. if (row.id !== data.id) {
  62. // Sanity check that something crazy hasn't happened
  63. throw new error.InternalValidationError('Stream could not be updated, IDs do not match: ' + row.id + ' !== ' + data.id);
  64. }
  65. return streamModel
  66. .query()
  67. .omit(omissions())
  68. .patchAndFetchById(row.id, data)
  69. .then((saved_row) => {
  70. return internalNginx.configure(streamModel, 'stream', saved_row)
  71. .then(() => {
  72. return internalStream.get(access, {id: row.id, expand: ['owner']});
  73. });
  74. })
  75. .then((saved_row) => {
  76. // Add to audit log
  77. return internalAuditLog.add(access, {
  78. action: 'updated',
  79. object_type: 'stream',
  80. object_id: row.id,
  81. meta: data
  82. })
  83. .then(() => {
  84. return _.omit(saved_row, omissions());
  85. });
  86. });
  87. });
  88. },
  89. /**
  90. * @param {Access} access
  91. * @param {Object} data
  92. * @param {Number} data.id
  93. * @param {Array} [data.expand]
  94. * @param {Array} [data.omit]
  95. * @return {Promise}
  96. */
  97. get: (access, data) => {
  98. if (typeof data === 'undefined') {
  99. data = {};
  100. }
  101. return access.can('streams:get', data.id)
  102. .then((access_data) => {
  103. let query = streamModel
  104. .query()
  105. .where('is_deleted', 0)
  106. .andWhere('id', data.id)
  107. .allowEager('[owner]')
  108. .first();
  109. if (access_data.permission_visibility !== 'all') {
  110. query.andWhere('owner_user_id', access.token.getUserId(1));
  111. }
  112. // Custom omissions
  113. if (typeof data.omit !== 'undefined' && data.omit !== null) {
  114. query.omit(data.omit);
  115. }
  116. if (typeof data.expand !== 'undefined' && data.expand !== null) {
  117. query.eager('[' + data.expand.join(', ') + ']');
  118. }
  119. return query;
  120. })
  121. .then((row) => {
  122. if (row) {
  123. return _.omit(row, omissions());
  124. } else {
  125. throw new error.ItemNotFoundError(data.id);
  126. }
  127. });
  128. },
  129. /**
  130. * @param {Access} access
  131. * @param {Object} data
  132. * @param {Number} data.id
  133. * @param {String} [data.reason]
  134. * @returns {Promise}
  135. */
  136. delete: (access, data) => {
  137. return access.can('streams:delete', data.id)
  138. .then(() => {
  139. return internalStream.get(access, {id: data.id});
  140. })
  141. .then((row) => {
  142. if (!row) {
  143. throw new error.ItemNotFoundError(data.id);
  144. }
  145. return streamModel
  146. .query()
  147. .where('id', row.id)
  148. .patch({
  149. is_deleted: 1
  150. })
  151. .then(() => {
  152. // Delete Nginx Config
  153. return internalNginx.deleteConfig('stream', row)
  154. .then(() => {
  155. return internalNginx.reload();
  156. });
  157. })
  158. .then(() => {
  159. // Add to audit log
  160. return internalAuditLog.add(access, {
  161. action: 'deleted',
  162. object_type: 'stream',
  163. object_id: row.id,
  164. meta: _.omit(row, omissions())
  165. });
  166. });
  167. })
  168. .then(() => {
  169. return true;
  170. });
  171. },
  172. /**
  173. * @param {Access} access
  174. * @param {Object} data
  175. * @param {Number} data.id
  176. * @param {String} [data.reason]
  177. * @returns {Promise}
  178. */
  179. enable: (access, data) => {
  180. return access.can('streams:update', data.id)
  181. .then(() => {
  182. return internalStream.get(access, {
  183. id: data.id,
  184. expand: ['owner']
  185. });
  186. })
  187. .then((row) => {
  188. if (!row) {
  189. throw new error.ItemNotFoundError(data.id);
  190. } else if (row.enabled) {
  191. throw new error.ValidationError('Host is already enabled');
  192. }
  193. row.enabled = 1;
  194. return streamModel
  195. .query()
  196. .where('id', row.id)
  197. .patch({
  198. enabled: 1
  199. })
  200. .then(() => {
  201. // Configure nginx
  202. return internalNginx.configure(streamModel, 'stream', row);
  203. })
  204. .then(() => {
  205. // Add to audit log
  206. return internalAuditLog.add(access, {
  207. action: 'enabled',
  208. object_type: 'stream',
  209. object_id: row.id,
  210. meta: _.omit(row, omissions())
  211. });
  212. });
  213. })
  214. .then(() => {
  215. return true;
  216. });
  217. },
  218. /**
  219. * @param {Access} access
  220. * @param {Object} data
  221. * @param {Number} data.id
  222. * @param {String} [data.reason]
  223. * @returns {Promise}
  224. */
  225. disable: (access, data) => {
  226. return access.can('streams:update', data.id)
  227. .then(() => {
  228. return internalStream.get(access, {id: data.id});
  229. })
  230. .then((row) => {
  231. if (!row) {
  232. throw new error.ItemNotFoundError(data.id);
  233. } else if (!row.enabled) {
  234. throw new error.ValidationError('Host is already disabled');
  235. }
  236. row.enabled = 0;
  237. return streamModel
  238. .query()
  239. .where('id', row.id)
  240. .patch({
  241. enabled: 0
  242. })
  243. .then(() => {
  244. // Delete Nginx Config
  245. return internalNginx.deleteConfig('stream', row)
  246. .then(() => {
  247. return internalNginx.reload();
  248. });
  249. })
  250. .then(() => {
  251. // Add to audit log
  252. return internalAuditLog.add(access, {
  253. action: 'disabled',
  254. object_type: 'stream-host',
  255. object_id: row.id,
  256. meta: _.omit(row, omissions())
  257. });
  258. });
  259. })
  260. .then(() => {
  261. return true;
  262. });
  263. },
  264. /**
  265. * All Streams
  266. *
  267. * @param {Access} access
  268. * @param {Array} [expand]
  269. * @param {String} [search_query]
  270. * @returns {Promise}
  271. */
  272. getAll: (access, expand, search_query) => {
  273. return access.can('streams:list')
  274. .then((access_data) => {
  275. let query = streamModel
  276. .query()
  277. .where('is_deleted', 0)
  278. .groupBy('id')
  279. .omit(['is_deleted'])
  280. .allowEager('[owner]')
  281. .orderBy('incoming_port', 'ASC');
  282. if (access_data.permission_visibility !== 'all') {
  283. query.andWhere('owner_user_id', access.token.getUserId(1));
  284. }
  285. // Query is used for searching
  286. if (typeof search_query === 'string') {
  287. query.where(function () {
  288. this.where('incoming_port', 'like', '%' + search_query + '%');
  289. });
  290. }
  291. if (typeof expand !== 'undefined' && expand !== null) {
  292. query.eager('[' + expand.join(', ') + ']');
  293. }
  294. return query;
  295. });
  296. },
  297. /**
  298. * Report use
  299. *
  300. * @param {Number} user_id
  301. * @param {String} visibility
  302. * @returns {Promise}
  303. */
  304. getCount: (user_id, visibility) => {
  305. let query = streamModel
  306. .query()
  307. .count('id as count')
  308. .where('is_deleted', 0);
  309. if (visibility !== 'all') {
  310. query.andWhere('owner_user_id', user_id);
  311. }
  312. return query.first()
  313. .then((row) => {
  314. return parseInt(row.count, 10);
  315. });
  316. }
  317. };
  318. module.exports = internalStream;