Program.Msmq.cs 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. using System;
  2. using System.Drawing;
  3. using System.Messaging;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. namespace RxMouseClient
  8. {
  9. partial class Program
  10. {
  11. static IObservable<Point> Msmq(string srv)
  12. {
  13. return FromQueue<Point>(srv + "\\Private$\\MouseService");
  14. }
  15. static IObservable<T> FromQueue<T>(string serverQueue)
  16. {
  17. return Observable.Create<T>(observer =>
  18. {
  19. var responseQueue = Environment.MachineName + "\\Private$\\" + Guid.NewGuid().ToString();
  20. var queue = MessageQueue.Create(responseQueue);
  21. var frm = new System.Messaging.BinaryMessageFormatter();
  22. var srv = new MessageQueue(serverQueue);
  23. srv.Formatter = frm;
  24. queue.Formatter = frm;
  25. srv.Send("S " + responseQueue);
  26. var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel =>
  27. {
  28. while (!cancel.IsDisposed)
  29. {
  30. var msg = queue.Receive();
  31. observer.OnNext((T)msg.Body);
  32. }
  33. });
  34. return new CompositeDisposable(
  35. loop,
  36. Disposable.Create(() =>
  37. {
  38. srv.Send("D " + responseQueue);
  39. MessageQueue.Delete(responseQueue);
  40. })
  41. );
  42. });
  43. }
  44. }
  45. }