Program.Msmq.cs 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Drawing;
  4. using System.Messaging;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. namespace RxMouseServer
  9. {
  10. partial class Program
  11. {
  12. static IObserver<Point> Msmq()
  13. {
  14. var q = "BARTDE-M6500\\Private$\\MouseService";
  15. var queue = default(MessageQueue);
  16. if (MessageQueue.Exists(q))
  17. {
  18. queue = new MessageQueue(q);
  19. }
  20. else
  21. {
  22. queue = MessageQueue.Create(q);
  23. }
  24. var format = new System.Messaging.BinaryMessageFormatter();
  25. queue.Formatter = format;
  26. var incoming = Observable.Create<string>(observer =>
  27. {
  28. return NewThreadScheduler.Default.ScheduleLongRunning(cancel =>
  29. {
  30. while (!cancel.IsDisposed)
  31. {
  32. var msg = queue.Receive();
  33. observer.OnNext((string)msg.Body);
  34. }
  35. });
  36. });
  37. var sub = new ReplaySubject<Point>();
  38. var map = new Dictionary<string, IDisposable>();
  39. incoming.Subscribe(clientQueue =>
  40. {
  41. var command = clientQueue[0];
  42. var target = clientQueue.Substring(2);
  43. switch (command)
  44. {
  45. case 'S':
  46. {
  47. var cq = new MessageQueue(target);
  48. var crm = new System.Messaging.BinaryMessageFormatter();
  49. cq.Formatter = crm;
  50. map[target] = sub.Subscribe(pt =>
  51. {
  52. cq.Send(pt);
  53. });
  54. }
  55. break;
  56. case 'D':
  57. {
  58. var d = default(IDisposable);
  59. if (map.TryGetValue(target, out d))
  60. d.Dispose();
  61. }
  62. break;
  63. default:
  64. throw new Exception("Don't know what you're talking about!");
  65. }
  66. });
  67. return sub;
  68. }
  69. }
  70. }