stream.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. const _ = require('lodash');
  2. const error = require('../lib/error');
  3. const utils = require('../lib/utils');
  4. const streamModel = require('../models/stream');
  5. const internalNginx = require('./nginx');
  6. const internalAuditLog = require('./audit-log');
  7. function omissions () {
  8. return ['is_deleted'];
  9. }
  10. const internalStream = {
  11. /**
  12. * @param {Access} access
  13. * @param {Object} data
  14. * @returns {Promise}
  15. */
  16. create: (access, data) => {
  17. return access.can('streams:create', data)
  18. .then((/*access_data*/) => {
  19. // TODO: At this point the existing ports should have been checked
  20. data.owner_user_id = access.token.getUserId(1);
  21. if (typeof data.meta === 'undefined') {
  22. data.meta = {};
  23. }
  24. return streamModel
  25. .query()
  26. .insertAndFetch(data)
  27. .then(utils.omitRow(omissions()));
  28. })
  29. .then((row) => {
  30. // Configure nginx
  31. return internalNginx.configure(streamModel, 'stream', row)
  32. .then(() => {
  33. return internalStream.get(access, {id: row.id, expand: ['owner']});
  34. });
  35. })
  36. .then((row) => {
  37. // Add to audit log
  38. return internalAuditLog.add(access, {
  39. action: 'created',
  40. object_type: 'stream',
  41. object_id: row.id,
  42. meta: data
  43. })
  44. .then(() => {
  45. return row;
  46. });
  47. });
  48. },
  49. /**
  50. * @param {Access} access
  51. * @param {Object} data
  52. * @param {Number} data.id
  53. * @return {Promise}
  54. */
  55. update: (access, data) => {
  56. return access.can('streams:update', data.id)
  57. .then((/*access_data*/) => {
  58. // TODO: at this point the existing streams should have been checked
  59. return internalStream.get(access, {id: data.id});
  60. })
  61. .then((row) => {
  62. if (row.id !== data.id) {
  63. // Sanity check that something crazy hasn't happened
  64. throw new error.InternalValidationError('Stream could not be updated, IDs do not match: ' + row.id + ' !== ' + data.id);
  65. }
  66. return streamModel
  67. .query()
  68. .patchAndFetchById(row.id, data)
  69. .then(utils.omitRow(omissions()))
  70. .then((saved_row) => {
  71. return internalNginx.configure(streamModel, 'stream', saved_row)
  72. .then(() => {
  73. return internalStream.get(access, {id: row.id, expand: ['owner']});
  74. });
  75. })
  76. .then((saved_row) => {
  77. // Add to audit log
  78. return internalAuditLog.add(access, {
  79. action: 'updated',
  80. object_type: 'stream',
  81. object_id: row.id,
  82. meta: data
  83. })
  84. .then(() => {
  85. return saved_row;
  86. });
  87. });
  88. });
  89. },
  90. /**
  91. * @param {Access} access
  92. * @param {Object} data
  93. * @param {Number} data.id
  94. * @param {Array} [data.expand]
  95. * @param {Array} [data.omit]
  96. * @return {Promise}
  97. */
  98. get: (access, data) => {
  99. if (typeof data === 'undefined') {
  100. data = {};
  101. }
  102. return access.can('streams:get', data.id)
  103. .then((access_data) => {
  104. let query = streamModel
  105. .query()
  106. .where('is_deleted', 0)
  107. .andWhere('id', data.id)
  108. .allowGraph('[owner]')
  109. .first();
  110. if (access_data.permission_visibility !== 'all') {
  111. query.andWhere('owner_user_id', access.token.getUserId(1));
  112. }
  113. if (typeof data.expand !== 'undefined' && data.expand !== null) {
  114. query.withGraphFetched('[' + data.expand.join(', ') + ']');
  115. }
  116. return query.then(utils.omitRow(omissions()));
  117. })
  118. .then((row) => {
  119. if (!row || !row.id) {
  120. throw new error.ItemNotFoundError(data.id);
  121. }
  122. // Custom omissions
  123. if (typeof data.omit !== 'undefined' && data.omit !== null) {
  124. row = _.omit(row, data.omit);
  125. }
  126. return row;
  127. });
  128. },
  129. /**
  130. * @param {Access} access
  131. * @param {Object} data
  132. * @param {Number} data.id
  133. * @param {String} [data.reason]
  134. * @returns {Promise}
  135. */
  136. delete: (access, data) => {
  137. return access.can('streams:delete', data.id)
  138. .then(() => {
  139. return internalStream.get(access, {id: data.id});
  140. })
  141. .then((row) => {
  142. if (!row || !row.id) {
  143. throw new error.ItemNotFoundError(data.id);
  144. }
  145. return streamModel
  146. .query()
  147. .where('id', row.id)
  148. .patch({
  149. is_deleted: 1
  150. })
  151. .then(() => {
  152. // Delete Nginx Config
  153. return internalNginx.deleteConfig('stream', row)
  154. .then(() => {
  155. return internalNginx.reload();
  156. });
  157. })
  158. .then(() => {
  159. // Add to audit log
  160. return internalAuditLog.add(access, {
  161. action: 'deleted',
  162. object_type: 'stream',
  163. object_id: row.id,
  164. meta: _.omit(row, omissions())
  165. });
  166. });
  167. })
  168. .then(() => {
  169. return true;
  170. });
  171. },
  172. /**
  173. * @param {Access} access
  174. * @param {Object} data
  175. * @param {Number} data.id
  176. * @param {String} [data.reason]
  177. * @returns {Promise}
  178. */
  179. enable: (access, data) => {
  180. return access.can('streams:update', data.id)
  181. .then(() => {
  182. return internalStream.get(access, {
  183. id: data.id,
  184. expand: ['owner']
  185. });
  186. })
  187. .then((row) => {
  188. if (!row || !row.id) {
  189. throw new error.ItemNotFoundError(data.id);
  190. } else if (row.enabled) {
  191. throw new error.ValidationError('Host is already enabled');
  192. }
  193. row.enabled = 1;
  194. return streamModel
  195. .query()
  196. .where('id', row.id)
  197. .patch({
  198. enabled: 1
  199. })
  200. .then(() => {
  201. // Configure nginx
  202. return internalNginx.configure(streamModel, 'stream', row);
  203. })
  204. .then(() => {
  205. // Add to audit log
  206. return internalAuditLog.add(access, {
  207. action: 'enabled',
  208. object_type: 'stream',
  209. object_id: row.id,
  210. meta: _.omit(row, omissions())
  211. });
  212. });
  213. })
  214. .then(() => {
  215. return true;
  216. });
  217. },
  218. /**
  219. * @param {Access} access
  220. * @param {Object} data
  221. * @param {Number} data.id
  222. * @param {String} [data.reason]
  223. * @returns {Promise}
  224. */
  225. disable: (access, data) => {
  226. return access.can('streams:update', data.id)
  227. .then(() => {
  228. return internalStream.get(access, {id: data.id});
  229. })
  230. .then((row) => {
  231. if (!row || !row.id) {
  232. throw new error.ItemNotFoundError(data.id);
  233. } else if (!row.enabled) {
  234. throw new error.ValidationError('Host is already disabled');
  235. }
  236. row.enabled = 0;
  237. return streamModel
  238. .query()
  239. .where('id', row.id)
  240. .patch({
  241. enabled: 0
  242. })
  243. .then(() => {
  244. // Delete Nginx Config
  245. return internalNginx.deleteConfig('stream', row)
  246. .then(() => {
  247. return internalNginx.reload();
  248. });
  249. })
  250. .then(() => {
  251. // Add to audit log
  252. return internalAuditLog.add(access, {
  253. action: 'disabled',
  254. object_type: 'stream-host',
  255. object_id: row.id,
  256. meta: _.omit(row, omissions())
  257. });
  258. });
  259. })
  260. .then(() => {
  261. return true;
  262. });
  263. },
  264. /**
  265. * All Streams
  266. *
  267. * @param {Access} access
  268. * @param {Array} [expand]
  269. * @param {String} [search_query]
  270. * @returns {Promise}
  271. */
  272. getAll: (access, expand, search_query) => {
  273. return access.can('streams:list')
  274. .then((access_data) => {
  275. let query = streamModel
  276. .query()
  277. .where('is_deleted', 0)
  278. .groupBy('id')
  279. .allowGraph('[owner]')
  280. .orderBy('incoming_port', 'ASC');
  281. if (access_data.permission_visibility !== 'all') {
  282. query.andWhere('owner_user_id', access.token.getUserId(1));
  283. }
  284. // Query is used for searching
  285. if (typeof search_query === 'string') {
  286. query.where(function () {
  287. this.where('incoming_port', 'like', '%' + search_query + '%');
  288. });
  289. }
  290. if (typeof expand !== 'undefined' && expand !== null) {
  291. query.withGraphFetched('[' + expand.join(', ') + ']');
  292. }
  293. return query.then(utils.omitRows(omissions()));
  294. });
  295. },
  296. /**
  297. * Report use
  298. *
  299. * @param {Number} user_id
  300. * @param {String} visibility
  301. * @returns {Promise}
  302. */
  303. getCount: (user_id, visibility) => {
  304. let query = streamModel
  305. .query()
  306. .count('id as count')
  307. .where('is_deleted', 0);
  308. if (visibility !== 'all') {
  309. query.andWhere('owner_user_id', user_id);
  310. }
  311. return query.first()
  312. .then((row) => {
  313. return parseInt(row.count, 10);
  314. });
  315. }
  316. };
  317. module.exports = internalStream;