stream.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. const _ = require('lodash');
  2. const error = require('../lib/error');
  3. const utils = require('../lib/utils');
  4. const streamModel = require('../models/stream');
  5. const internalNginx = require('./nginx');
  6. const internalAuditLog = require('./audit-log');
  7. const {castJsonIfNeed} = require('../lib/helpers');
  8. function omissions () {
  9. return ['is_deleted'];
  10. }
  11. const internalStream = {
  12. /**
  13. * @param {Access} access
  14. * @param {Object} data
  15. * @returns {Promise}
  16. */
  17. create: (access, data) => {
  18. return access.can('streams:create', data)
  19. .then((/*access_data*/) => {
  20. // TODO: At this point the existing ports should have been checked
  21. data.owner_user_id = access.token.getUserId(1);
  22. if (typeof data.meta === 'undefined') {
  23. data.meta = {};
  24. }
  25. return streamModel
  26. .query()
  27. .insertAndFetch(data)
  28. .then(utils.omitRow(omissions()));
  29. })
  30. .then((row) => {
  31. // Configure nginx
  32. return internalNginx.configure(streamModel, 'stream', row)
  33. .then(() => {
  34. return internalStream.get(access, {id: row.id, expand: ['owner']});
  35. });
  36. })
  37. .then((row) => {
  38. // Add to audit log
  39. return internalAuditLog.add(access, {
  40. action: 'created',
  41. object_type: 'stream',
  42. object_id: row.id,
  43. meta: data
  44. })
  45. .then(() => {
  46. return row;
  47. });
  48. });
  49. },
  50. /**
  51. * @param {Access} access
  52. * @param {Object} data
  53. * @param {Number} data.id
  54. * @return {Promise}
  55. */
  56. update: (access, data) => {
  57. return access.can('streams:update', data.id)
  58. .then((/*access_data*/) => {
  59. // TODO: at this point the existing streams should have been checked
  60. return internalStream.get(access, {id: data.id});
  61. })
  62. .then((row) => {
  63. if (row.id !== data.id) {
  64. // Sanity check that something crazy hasn't happened
  65. throw new error.InternalValidationError('Stream could not be updated, IDs do not match: ' + row.id + ' !== ' + data.id);
  66. }
  67. return streamModel
  68. .query()
  69. .patchAndFetchById(row.id, data)
  70. .then(utils.omitRow(omissions()))
  71. .then((saved_row) => {
  72. return internalNginx.configure(streamModel, 'stream', saved_row)
  73. .then(() => {
  74. return internalStream.get(access, {id: row.id, expand: ['owner']});
  75. });
  76. })
  77. .then((saved_row) => {
  78. // Add to audit log
  79. return internalAuditLog.add(access, {
  80. action: 'updated',
  81. object_type: 'stream',
  82. object_id: row.id,
  83. meta: data
  84. })
  85. .then(() => {
  86. return saved_row;
  87. });
  88. });
  89. });
  90. },
  91. /**
  92. * @param {Access} access
  93. * @param {Object} data
  94. * @param {Number} data.id
  95. * @param {Array} [data.expand]
  96. * @param {Array} [data.omit]
  97. * @return {Promise}
  98. */
  99. get: (access, data) => {
  100. if (typeof data === 'undefined') {
  101. data = {};
  102. }
  103. return access.can('streams:get', data.id)
  104. .then((access_data) => {
  105. let query = streamModel
  106. .query()
  107. .where('is_deleted', 0)
  108. .andWhere('id', data.id)
  109. .allowGraph('[owner]')
  110. .first();
  111. if (access_data.permission_visibility !== 'all') {
  112. query.andWhere('owner_user_id', access.token.getUserId(1));
  113. }
  114. if (typeof data.expand !== 'undefined' && data.expand !== null) {
  115. query.withGraphFetched('[' + data.expand.join(', ') + ']');
  116. }
  117. return query.then(utils.omitRow(omissions()));
  118. })
  119. .then((row) => {
  120. if (!row || !row.id) {
  121. throw new error.ItemNotFoundError(data.id);
  122. }
  123. // Custom omissions
  124. if (typeof data.omit !== 'undefined' && data.omit !== null) {
  125. row = _.omit(row, data.omit);
  126. }
  127. return row;
  128. });
  129. },
  130. /**
  131. * @param {Access} access
  132. * @param {Object} data
  133. * @param {Number} data.id
  134. * @param {String} [data.reason]
  135. * @returns {Promise}
  136. */
  137. delete: (access, data) => {
  138. return access.can('streams:delete', data.id)
  139. .then(() => {
  140. return internalStream.get(access, {id: data.id});
  141. })
  142. .then((row) => {
  143. if (!row || !row.id) {
  144. throw new error.ItemNotFoundError(data.id);
  145. }
  146. return streamModel
  147. .query()
  148. .where('id', row.id)
  149. .patch({
  150. is_deleted: 1
  151. })
  152. .then(() => {
  153. // Delete Nginx Config
  154. return internalNginx.deleteConfig('stream', row)
  155. .then(() => {
  156. return internalNginx.reload();
  157. });
  158. })
  159. .then(() => {
  160. // Add to audit log
  161. return internalAuditLog.add(access, {
  162. action: 'deleted',
  163. object_type: 'stream',
  164. object_id: row.id,
  165. meta: _.omit(row, omissions())
  166. });
  167. });
  168. })
  169. .then(() => {
  170. return true;
  171. });
  172. },
  173. /**
  174. * @param {Access} access
  175. * @param {Object} data
  176. * @param {Number} data.id
  177. * @param {String} [data.reason]
  178. * @returns {Promise}
  179. */
  180. enable: (access, data) => {
  181. return access.can('streams:update', data.id)
  182. .then(() => {
  183. return internalStream.get(access, {
  184. id: data.id,
  185. expand: ['owner']
  186. });
  187. })
  188. .then((row) => {
  189. if (!row || !row.id) {
  190. throw new error.ItemNotFoundError(data.id);
  191. } else if (row.enabled) {
  192. throw new error.ValidationError('Host is already enabled');
  193. }
  194. row.enabled = 1;
  195. return streamModel
  196. .query()
  197. .where('id', row.id)
  198. .patch({
  199. enabled: 1
  200. })
  201. .then(() => {
  202. // Configure nginx
  203. return internalNginx.configure(streamModel, 'stream', row);
  204. })
  205. .then(() => {
  206. // Add to audit log
  207. return internalAuditLog.add(access, {
  208. action: 'enabled',
  209. object_type: 'stream',
  210. object_id: row.id,
  211. meta: _.omit(row, omissions())
  212. });
  213. });
  214. })
  215. .then(() => {
  216. return true;
  217. });
  218. },
  219. /**
  220. * @param {Access} access
  221. * @param {Object} data
  222. * @param {Number} data.id
  223. * @param {String} [data.reason]
  224. * @returns {Promise}
  225. */
  226. disable: (access, data) => {
  227. return access.can('streams:update', data.id)
  228. .then(() => {
  229. return internalStream.get(access, {id: data.id});
  230. })
  231. .then((row) => {
  232. if (!row || !row.id) {
  233. throw new error.ItemNotFoundError(data.id);
  234. } else if (!row.enabled) {
  235. throw new error.ValidationError('Host is already disabled');
  236. }
  237. row.enabled = 0;
  238. return streamModel
  239. .query()
  240. .where('id', row.id)
  241. .patch({
  242. enabled: 0
  243. })
  244. .then(() => {
  245. // Delete Nginx Config
  246. return internalNginx.deleteConfig('stream', row)
  247. .then(() => {
  248. return internalNginx.reload();
  249. });
  250. })
  251. .then(() => {
  252. // Add to audit log
  253. return internalAuditLog.add(access, {
  254. action: 'disabled',
  255. object_type: 'stream-host',
  256. object_id: row.id,
  257. meta: _.omit(row, omissions())
  258. });
  259. });
  260. })
  261. .then(() => {
  262. return true;
  263. });
  264. },
  265. /**
  266. * All Streams
  267. *
  268. * @param {Access} access
  269. * @param {Array} [expand]
  270. * @param {String} [search_query]
  271. * @returns {Promise}
  272. */
  273. getAll: (access, expand, search_query) => {
  274. return access.can('streams:list')
  275. .then((access_data) => {
  276. const query = streamModel
  277. .query()
  278. .where('is_deleted', 0)
  279. .groupBy('id')
  280. .allowGraph('[owner]')
  281. .orderByRaw('CAST(incoming_port AS INTEGER) ASC');
  282. if (access_data.permission_visibility !== 'all') {
  283. query.andWhere('owner_user_id', access.token.getUserId(1));
  284. }
  285. // Query is used for searching
  286. if (typeof search_query === 'string' && search_query.length > 0) {
  287. query.where(function () {
  288. this.where(castJsonIfNeed('incoming_port'), 'like', `%${search_query}%`);
  289. });
  290. }
  291. if (typeof expand !== 'undefined' && expand !== null) {
  292. query.withGraphFetched('[' + expand.join(', ') + ']');
  293. }
  294. return query.then(utils.omitRows(omissions()));
  295. });
  296. },
  297. /**
  298. * Report use
  299. *
  300. * @param {Number} user_id
  301. * @param {String} visibility
  302. * @returns {Promise}
  303. */
  304. getCount: (user_id, visibility) => {
  305. const query = streamModel
  306. .query()
  307. .count('id AS count')
  308. .where('is_deleted', 0);
  309. if (visibility !== 'all') {
  310. query.andWhere('owner_user_id', user_id);
  311. }
  312. return query.first()
  313. .then((row) => {
  314. return parseInt(row.count, 10);
  315. });
  316. }
  317. };
  318. module.exports = internalStream;