stream.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import _ from "lodash";
  2. import errs from "../lib/error.js";
  3. import { castJsonIfNeed } from "../lib/helpers.js";
  4. import utils from "../lib/utils.js";
  5. import streamModel from "../models/stream.js";
  6. import internalAuditLog from "./audit-log.js";
  7. import internalCertificate from "./certificate.js";
  8. import internalHost from "./host.js";
  9. import internalNginx from "./nginx.js";
  10. const omissions = () => {
  11. return ["is_deleted", "owner.is_deleted", "certificate.is_deleted"];
  12. };
  13. const internalStream = {
  14. /**
  15. * @param {Access} access
  16. * @param {Object} data
  17. * @returns {Promise}
  18. */
  19. create: (access, data) => {
  20. const create_certificate = data.certificate_id === "new";
  21. if (create_certificate) {
  22. delete data.certificate_id;
  23. }
  24. return access
  25. .can("streams:create", data)
  26. .then((/*access_data*/) => {
  27. // TODO: At this point the existing ports should have been checked
  28. data.owner_user_id = access.token.getUserId(1);
  29. if (typeof data.meta === "undefined") {
  30. data.meta = {};
  31. }
  32. // streams aren't routed by domain name so don't store domain names in the DB
  33. const data_no_domains = structuredClone(data);
  34. delete data_no_domains.domain_names;
  35. return streamModel.query().insertAndFetch(data_no_domains).then(utils.omitRow(omissions()));
  36. })
  37. .then((row) => {
  38. if (create_certificate) {
  39. return internalCertificate
  40. .createQuickCertificate(access, data)
  41. .then((cert) => {
  42. // update host with cert id
  43. return internalStream.update(access, {
  44. id: row.id,
  45. certificate_id: cert.id,
  46. });
  47. })
  48. .then(() => {
  49. return row;
  50. });
  51. }
  52. return row;
  53. })
  54. .then((row) => {
  55. // re-fetch with cert
  56. return internalStream.get(access, {
  57. id: row.id,
  58. expand: ["certificate", "owner"],
  59. });
  60. })
  61. .then((row) => {
  62. // Configure nginx
  63. return internalNginx.configure(streamModel, "stream", row).then(() => {
  64. return row;
  65. });
  66. })
  67. .then((row) => {
  68. // Add to audit log
  69. return internalAuditLog
  70. .add(access, {
  71. action: "created",
  72. object_type: "stream",
  73. object_id: row.id,
  74. meta: data,
  75. })
  76. .then(() => {
  77. return row;
  78. });
  79. });
  80. },
  81. /**
  82. * @param {Access} access
  83. * @param {Object} data
  84. * @param {Number} data.id
  85. * @return {Promise}
  86. */
  87. update: (access, data) => {
  88. let thisData = data;
  89. const create_certificate = thisData.certificate_id === "new";
  90. if (create_certificate) {
  91. delete thisData.certificate_id;
  92. }
  93. return access
  94. .can("streams:update", thisData.id)
  95. .then((/*access_data*/) => {
  96. // TODO: at this point the existing streams should have been checked
  97. return internalStream.get(access, { id: thisData.id });
  98. })
  99. .then((row) => {
  100. if (row.id !== thisData.id) {
  101. // Sanity check that something crazy hasn't happened
  102. throw new errs.InternalValidationError(
  103. `Stream could not be updated, IDs do not match: ${row.id} !== ${thisData.id}`,
  104. );
  105. }
  106. if (create_certificate) {
  107. return internalCertificate
  108. .createQuickCertificate(access, {
  109. domain_names: thisData.domain_names || row.domain_names,
  110. meta: _.assign({}, row.meta, thisData.meta),
  111. })
  112. .then((cert) => {
  113. // update host with cert id
  114. thisData.certificate_id = cert.id;
  115. })
  116. .then(() => {
  117. return row;
  118. });
  119. }
  120. return row;
  121. })
  122. .then((row) => {
  123. // Add domain_names to the data in case it isn't there, so that the audit log renders correctly. The order is important here.
  124. thisData = _.assign(
  125. {},
  126. {
  127. domain_names: row.domain_names,
  128. },
  129. thisData,
  130. );
  131. return streamModel
  132. .query()
  133. .patchAndFetchById(row.id, thisData)
  134. .then(utils.omitRow(omissions()))
  135. .then((saved_row) => {
  136. // Add to audit log
  137. return internalAuditLog
  138. .add(access, {
  139. action: "updated",
  140. object_type: "stream",
  141. object_id: row.id,
  142. meta: thisData,
  143. })
  144. .then(() => {
  145. return saved_row;
  146. });
  147. });
  148. })
  149. .then(() => {
  150. return internalStream.get(access, { id: thisData.id, expand: ["owner", "certificate"] }).then((row) => {
  151. return internalNginx.configure(streamModel, "stream", row).then((new_meta) => {
  152. row.meta = new_meta;
  153. return _.omit(internalHost.cleanRowCertificateMeta(row), omissions());
  154. });
  155. });
  156. });
  157. },
  158. /**
  159. * @param {Access} access
  160. * @param {Object} data
  161. * @param {Number} data.id
  162. * @param {Array} [data.expand]
  163. * @param {Array} [data.omit]
  164. * @return {Promise}
  165. */
  166. get: (access, data) => {
  167. const thisData = data || {};
  168. return access
  169. .can("streams:get", thisData.id)
  170. .then((access_data) => {
  171. const query = streamModel
  172. .query()
  173. .where("is_deleted", 0)
  174. .andWhere("id", thisData.id)
  175. .allowGraph("[owner,certificate]")
  176. .first();
  177. if (access_data.permission_visibility !== "all") {
  178. query.andWhere("owner_user_id", access.token.getUserId(1));
  179. }
  180. if (typeof thisData.expand !== "undefined" && thisData.expand !== null) {
  181. query.withGraphFetched(`[${thisData.expand.join(", ")}]`);
  182. }
  183. return query.then(utils.omitRow(omissions()));
  184. })
  185. .then((row) => {
  186. let thisRow = row;
  187. if (!thisRow || !thisRow.id) {
  188. throw new errs.ItemNotFoundError(thisData.id);
  189. }
  190. thisRow = internalHost.cleanRowCertificateMeta(thisRow);
  191. // Custom omissions
  192. if (typeof thisData.omit !== "undefined" && thisData.omit !== null) {
  193. return _.omit(thisRow, thisData.omit);
  194. }
  195. return thisRow;
  196. });
  197. },
  198. /**
  199. * @param {Access} access
  200. * @param {Object} data
  201. * @param {Number} data.id
  202. * @param {String} [data.reason]
  203. * @returns {Promise}
  204. */
  205. delete: (access, data) => {
  206. return access
  207. .can("streams:delete", data.id)
  208. .then(() => {
  209. return internalStream.get(access, { id: data.id });
  210. })
  211. .then((row) => {
  212. if (!row || !row.id) {
  213. throw new errs.ItemNotFoundError(data.id);
  214. }
  215. return streamModel
  216. .query()
  217. .where("id", row.id)
  218. .patch({
  219. is_deleted: 1,
  220. })
  221. .then(() => {
  222. // Delete Nginx Config
  223. return internalNginx.deleteConfig("stream", row).then(() => {
  224. return internalNginx.reload();
  225. });
  226. })
  227. .then(() => {
  228. // Add to audit log
  229. return internalAuditLog.add(access, {
  230. action: "deleted",
  231. object_type: "stream",
  232. object_id: row.id,
  233. meta: _.omit(row, omissions()),
  234. });
  235. });
  236. })
  237. .then(() => {
  238. return true;
  239. });
  240. },
  241. /**
  242. * @param {Access} access
  243. * @param {Object} data
  244. * @param {Number} data.id
  245. * @param {String} [data.reason]
  246. * @returns {Promise}
  247. */
  248. enable: (access, data) => {
  249. return access
  250. .can("streams:update", data.id)
  251. .then(() => {
  252. return internalStream.get(access, {
  253. id: data.id,
  254. expand: ["certificate", "owner"],
  255. });
  256. })
  257. .then((row) => {
  258. if (!row || !row.id) {
  259. throw new errs.ItemNotFoundError(data.id);
  260. }
  261. if (row.enabled) {
  262. throw new errs.ValidationError("Stream is already enabled");
  263. }
  264. row.enabled = 1;
  265. return streamModel
  266. .query()
  267. .where("id", row.id)
  268. .patch({
  269. enabled: 1,
  270. })
  271. .then(() => {
  272. // Configure nginx
  273. return internalNginx.configure(streamModel, "stream", row);
  274. })
  275. .then(() => {
  276. // Add to audit log
  277. return internalAuditLog.add(access, {
  278. action: "enabled",
  279. object_type: "stream",
  280. object_id: row.id,
  281. meta: _.omit(row, omissions()),
  282. });
  283. });
  284. })
  285. .then(() => {
  286. return true;
  287. });
  288. },
  289. /**
  290. * @param {Access} access
  291. * @param {Object} data
  292. * @param {Number} data.id
  293. * @param {String} [data.reason]
  294. * @returns {Promise}
  295. */
  296. disable: (access, data) => {
  297. return access
  298. .can("streams:update", data.id)
  299. .then(() => {
  300. return internalStream.get(access, { id: data.id });
  301. })
  302. .then((row) => {
  303. if (!row || !row.id) {
  304. throw new errs.ItemNotFoundError(data.id);
  305. }
  306. if (!row.enabled) {
  307. throw new errs.ValidationError("Stream is already disabled");
  308. }
  309. row.enabled = 0;
  310. return streamModel
  311. .query()
  312. .where("id", row.id)
  313. .patch({
  314. enabled: 0,
  315. })
  316. .then(() => {
  317. // Delete Nginx Config
  318. return internalNginx.deleteConfig("stream", row).then(() => {
  319. return internalNginx.reload();
  320. });
  321. })
  322. .then(() => {
  323. // Add to audit log
  324. return internalAuditLog.add(access, {
  325. action: "disabled",
  326. object_type: "stream",
  327. object_id: row.id,
  328. meta: _.omit(row, omissions()),
  329. });
  330. });
  331. })
  332. .then(() => {
  333. return true;
  334. });
  335. },
  336. /**
  337. * All Streams
  338. *
  339. * @param {Access} access
  340. * @param {Array} [expand]
  341. * @param {String} [search_query]
  342. * @returns {Promise}
  343. */
  344. getAll: (access, expand, search_query) => {
  345. return access
  346. .can("streams:list")
  347. .then((access_data) => {
  348. const query = streamModel
  349. .query()
  350. .where("is_deleted", 0)
  351. .groupBy("id")
  352. .allowGraph("[owner,certificate]")
  353. .orderBy("incoming_port", "ASC");
  354. if (access_data.permission_visibility !== "all") {
  355. query.andWhere("owner_user_id", access.token.getUserId(1));
  356. }
  357. // Query is used for searching
  358. if (typeof search_query === "string" && search_query.length > 0) {
  359. query.where(function () {
  360. this.where(castJsonIfNeed("incoming_port"), "like", `%${search_query}%`);
  361. });
  362. }
  363. if (typeof expand !== "undefined" && expand !== null) {
  364. query.withGraphFetched(`[${expand.join(", ")}]`);
  365. }
  366. return query.then(utils.omitRows(omissions()));
  367. })
  368. .then((rows) => {
  369. if (typeof expand !== "undefined" && expand !== null && expand.indexOf("certificate") !== -1) {
  370. return internalHost.cleanAllRowsCertificateMeta(rows);
  371. }
  372. return rows;
  373. });
  374. },
  375. /**
  376. * Report use
  377. *
  378. * @param {Number} user_id
  379. * @param {String} visibility
  380. * @returns {Promise}
  381. */
  382. getCount: (user_id, visibility) => {
  383. const query = streamModel.query().count("id AS count").where("is_deleted", 0);
  384. if (visibility !== "all") {
  385. query.andWhere("owner_user_id", user_id);
  386. }
  387. return query.first().then((row) => {
  388. return Number.parseInt(row.count, 10);
  389. });
  390. },
  391. };
  392. export default internalStream;