ServerStreamingServerCallHandlerTests.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. using System.Buffers;
  4. using System.IO.Pipelines;
  5. using System.Text;
  6. using System.Text.Json;
  7. using System.Xml.Linq;
  8. using Google.Api;
  9. using Google.Protobuf;
  10. using Google.Protobuf.Reflection;
  11. using Grpc.AspNetCore.Server;
  12. using Grpc.AspNetCore.Server.Model;
  13. using Grpc.Core;
  14. using Grpc.Shared;
  15. using Grpc.Shared.Server;
  16. using Grpc.Tests.Shared;
  17. using Microsoft.AspNetCore.Grpc.JsonTranscoding.Internal.CallHandlers;
  18. using Microsoft.AspNetCore.Grpc.JsonTranscoding.Internal.Json;
  19. using Microsoft.AspNetCore.Grpc.JsonTranscoding.Tests.Infrastructure;
  20. using Microsoft.AspNetCore.Testing;
  21. using Transcoding;
  22. using Xunit.Abstractions;
  23. using MethodOptions = Grpc.Shared.Server.MethodOptions;
  24. using Type = System.Type;
  25. namespace Microsoft.AspNetCore.Grpc.JsonTranscoding.Tests;
  26. public class ServerStreamingServerCallHandlerTests : LoggedTest
  27. {
  28. public ServerStreamingServerCallHandlerTests(ITestOutputHelper output) : base(output) { }
  29. [Fact]
  30. public async Task HandleCallAsync_WriteMultipleMessages_Returned()
  31. {
  32. // Arrange
  33. var syncPoint = new SyncPoint();
  34. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HelloReply> invoker = async (s, r, w, c) =>
  35. {
  36. await w.WriteAsync(new HelloReply { Message = $"Hello {r.Name} 1" });
  37. await syncPoint.WaitToContinue();
  38. await w.WriteAsync(new HelloReply { Message = $"Hello {r.Name} 2" });
  39. };
  40. var pipe = new Pipe();
  41. var descriptorPath = new List<FieldDescriptor>(new[] { HelloRequest.Descriptor.FindFieldByNumber(HelloRequest.NameFieldNumber) });
  42. var routeParameterDescriptors = new Dictionary<string, RouteParameter>
  43. {
  44. ["name"] = CreateRouteParameter(descriptorPath)
  45. };
  46. var descriptorInfo = TestHelpers.CreateDescriptorInfo(routeParameterDescriptors: routeParameterDescriptors);
  47. var callHandler = CreateCallHandler(invoker, descriptorInfo: descriptorInfo);
  48. var httpContext = TestHelpers.CreateHttpContext(bodyStream: pipe.Writer.AsStream());
  49. httpContext.Request.RouteValues["name"] = "TestName!";
  50. // Act
  51. var callTask = callHandler.HandleCallAsync(httpContext);
  52. // Assert
  53. Assert.Equal(200, httpContext.Response.StatusCode);
  54. Assert.Equal("application/json; charset=utf-8", httpContext.Response.ContentType);
  55. var line1 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  56. using var responseJson1 = JsonDocument.Parse(line1!);
  57. Assert.Equal("Hello TestName! 1", responseJson1.RootElement.GetProperty("message").GetString());
  58. await syncPoint.WaitForSyncPoint().DefaultTimeout();
  59. syncPoint.Continue();
  60. var line2 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  61. using var responseJson2 = JsonDocument.Parse(line2!);
  62. Assert.Equal("Hello TestName! 2", responseJson2.RootElement.GetProperty("message").GetString());
  63. await callTask.DefaultTimeout();
  64. }
  65. private static RouteParameter CreateRouteParameter(List<FieldDescriptor> descriptorPath)
  66. {
  67. return new RouteParameter(descriptorPath, new HttpRouteVariable(), string.Empty);
  68. }
  69. [Fact]
  70. public async Task HandleCallAsync_MessageThenError_MessageThenErrorReturned()
  71. {
  72. // Arrange
  73. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HelloReply> invoker = async (s, r, w, c) =>
  74. {
  75. await w.WriteAsync(new HelloReply { Message = $"Hello {r.Name} 1" });
  76. throw new Exception("Exception!");
  77. };
  78. var pipe = new Pipe();
  79. var routeParameterDescriptors = new Dictionary<string, RouteParameter>
  80. {
  81. ["name"] = CreateRouteParameter(new List<FieldDescriptor>(new[] { HelloRequest.Descriptor.FindFieldByNumber(HelloRequest.NameFieldNumber) }))
  82. };
  83. var descriptorInfo = TestHelpers.CreateDescriptorInfo(routeParameterDescriptors: routeParameterDescriptors);
  84. var callHandler = CreateCallHandler(invoker, descriptorInfo: descriptorInfo);
  85. var httpContext = TestHelpers.CreateHttpContext(bodyStream: pipe.Writer.AsStream());
  86. httpContext.Request.RouteValues["name"] = "TestName!";
  87. // Act
  88. var callTask = callHandler.HandleCallAsync(httpContext);
  89. // Assert
  90. var line1 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  91. using var responseJson1 = JsonDocument.Parse(line1!);
  92. Assert.Equal("Hello TestName! 1", responseJson1.RootElement.GetProperty("message").GetString());
  93. var line2 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  94. using var responseJson2 = JsonDocument.Parse(line2!);
  95. Assert.Equal("Exception was thrown by handler.", responseJson2.RootElement.GetProperty("message").GetString());
  96. Assert.Equal(2, responseJson2.RootElement.GetProperty("code").GetInt32());
  97. var exceptionWrite = TestSink.Writes.Single(w => w.EventId.Name == "ErrorExecutingServiceMethod");
  98. Assert.Equal("Error when executing service method 'TestMethodName'.", exceptionWrite.Message);
  99. Assert.Equal("Exception!", exceptionWrite.Exception.Message);
  100. await callTask.DefaultTimeout();
  101. }
  102. [Fact]
  103. public async Task HandleCallAsync_MessageThenRpcException_MessageThenErrorReturned()
  104. {
  105. // Arrange
  106. var debugException = new Exception("Error!");
  107. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HelloReply> invoker = async (s, r, w, c) =>
  108. {
  109. await w.WriteAsync(new HelloReply { Message = $"Hello {r.Name} 1" });
  110. throw new RpcException(new Status(StatusCode.Aborted, "Detail!", debugException));
  111. };
  112. var pipe = new Pipe();
  113. var routeParameterDescriptors = new Dictionary<string, RouteParameter>
  114. {
  115. ["name"] = CreateRouteParameter(new List<FieldDescriptor>(new[] { HelloRequest.Descriptor.FindFieldByNumber(HelloRequest.NameFieldNumber) }))
  116. };
  117. var descriptorInfo = TestHelpers.CreateDescriptorInfo(routeParameterDescriptors: routeParameterDescriptors);
  118. var callHandler = CreateCallHandler(invoker, descriptorInfo: descriptorInfo);
  119. var httpContext = TestHelpers.CreateHttpContext(bodyStream: pipe.Writer.AsStream());
  120. httpContext.Request.RouteValues["name"] = "TestName!";
  121. // Act
  122. var callTask = callHandler.HandleCallAsync(httpContext);
  123. // Assert
  124. var line1 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  125. using var responseJson1 = JsonDocument.Parse(line1!);
  126. Assert.Equal("Hello TestName! 1", responseJson1.RootElement.GetProperty("message").GetString());
  127. var line2 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  128. using var responseJson2 = JsonDocument.Parse(line2!);
  129. Assert.Equal("Detail!", responseJson2.RootElement.GetProperty("message").GetString());
  130. Assert.Equal((int)StatusCode.Aborted, responseJson2.RootElement.GetProperty("code").GetInt32());
  131. var exceptionWrite = TestSink.Writes.Single(w => w.EventId.Name == "RpcConnectionError");
  132. Assert.Equal("Error status code 'Aborted' with detail 'Detail!' raised.", exceptionWrite.Message);
  133. Assert.Equal(debugException, exceptionWrite.Exception);
  134. await callTask.DefaultTimeout();
  135. }
  136. [Fact]
  137. public async Task HandleCallAsync_ErrorWithDetailedErrors_DetailedErrorResponse()
  138. {
  139. // Arrange
  140. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HelloReply> invoker = (s, r, w, c) =>
  141. {
  142. return Task.FromException<HelloReply>(new Exception("Exception!"));
  143. };
  144. var pipe = new Pipe();
  145. var routeParameterDescriptors = new Dictionary<string, RouteParameter>
  146. {
  147. ["name"] = CreateRouteParameter(new List<FieldDescriptor>(new[] { HelloRequest.Descriptor.FindFieldByNumber(HelloRequest.NameFieldNumber) }))
  148. };
  149. var descriptorInfo = TestHelpers.CreateDescriptorInfo(routeParameterDescriptors: routeParameterDescriptors);
  150. var serviceOptions = new GrpcServiceOptions { EnableDetailedErrors = true };
  151. var callHandler = CreateCallHandler(invoker, descriptorInfo: descriptorInfo, serviceOptions: serviceOptions);
  152. var httpContext = TestHelpers.CreateHttpContext(bodyStream: pipe.Writer.AsStream());
  153. httpContext.Request.RouteValues["name"] = "TestName!";
  154. // Act
  155. var callTask = callHandler.HandleCallAsync(httpContext);
  156. // Assert
  157. var line = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  158. using var responseJson = JsonDocument.Parse(line!);
  159. Assert.Equal("Exception was thrown by handler. Exception: Exception!", responseJson.RootElement.GetProperty("message").GetString());
  160. Assert.Equal(2, responseJson.RootElement.GetProperty("code").GetInt32());
  161. var exceptionWrite = TestSink.Writes.Single(w => w.EventId.Name == "ErrorExecutingServiceMethod");
  162. Assert.Equal("Error when executing service method 'TestMethodName'.", exceptionWrite.Message);
  163. Assert.Equal("Exception!", exceptionWrite.Exception.Message);
  164. await callTask.DefaultTimeout();
  165. }
  166. [Fact]
  167. public async Task HandleCallAsync_HttpBody_WriteMultipleMessages_Returned()
  168. {
  169. // Arrange
  170. var syncPoint = new SyncPoint();
  171. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HttpBody> invoker = async (s, r, w, c) =>
  172. {
  173. await w.WriteAsync(new HttpBody
  174. {
  175. ContentType = "application/xml",
  176. Data = ByteString.CopyFrom(Encoding.UTF8.GetBytes($"<message>Hello {r.Name} 1</message>"))
  177. });
  178. await syncPoint.WaitToContinue();
  179. await w.WriteAsync(new HttpBody
  180. {
  181. ContentType = "application/xml",
  182. Data = ByteString.CopyFrom(Encoding.UTF8.GetBytes($"<message>Hello {r.Name} 2</message>"))
  183. });
  184. };
  185. var pipe = new Pipe();
  186. var routeParameterDescriptors = new Dictionary<string, RouteParameter>
  187. {
  188. ["name"] = CreateRouteParameter(new List<FieldDescriptor>(new[] { HelloRequest.Descriptor.FindFieldByNumber(HelloRequest.NameFieldNumber) }))
  189. };
  190. var descriptorInfo = TestHelpers.CreateDescriptorInfo(routeParameterDescriptors: routeParameterDescriptors);
  191. var callHandler = CreateCallHandler(
  192. invoker,
  193. CreateServiceMethod("HttpResponseBody", HelloRequest.Parser, HttpBody.Parser),
  194. descriptorInfo: descriptorInfo);
  195. var httpContext = TestHelpers.CreateHttpContext(bodyStream: pipe.Writer.AsStream());
  196. httpContext.Request.RouteValues["name"] = "TestName!";
  197. // Act
  198. var callTask = callHandler.HandleCallAsync(httpContext);
  199. // Assert
  200. Assert.Equal(200, httpContext.Response.StatusCode);
  201. Assert.Equal("application/xml", httpContext.Response.ContentType);
  202. var line1 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  203. var responseXml1 = XDocument.Parse(line1!);
  204. Assert.Equal("Hello TestName! 1", (string)responseXml1.Element("message")!);
  205. await syncPoint.WaitForSyncPoint().DefaultTimeout();
  206. syncPoint.Continue();
  207. var line2 = await ReadLineAsync(pipe.Reader).DefaultTimeout();
  208. var responseXml2 = XDocument.Parse(line2!);
  209. Assert.Equal("Hello TestName! 2", (string)responseXml2.Element("message")!);
  210. await callTask.DefaultTimeout();
  211. }
  212. public async Task<string?> ReadLineAsync(PipeReader pipeReader)
  213. {
  214. string? str;
  215. while (true)
  216. {
  217. var result = await pipeReader.ReadAsync();
  218. var buffer = result.Buffer;
  219. if ((str = ReadLine(ref buffer, out var end)) is not null)
  220. {
  221. pipeReader.AdvanceTo(end, end);
  222. return str;
  223. }
  224. pipeReader.AdvanceTo(buffer.Start, buffer.End);
  225. if (result.IsCompleted)
  226. {
  227. break;
  228. }
  229. }
  230. return str;
  231. }
  232. private static string? ReadLine(ref ReadOnlySequence<byte> buffer, out SequencePosition end)
  233. {
  234. var reader = new SequenceReader<byte>(buffer);
  235. if (reader.TryReadTo(out ReadOnlySequence<byte> line, (byte)'\n'))
  236. {
  237. buffer = buffer.Slice(reader.Position);
  238. end = reader.Position;
  239. return Encoding.UTF8.GetString(line);
  240. }
  241. end = default;
  242. return null;
  243. }
  244. private ServerStreamingServerCallHandler<JsonTranscodingGreeterService, HelloRequest, HelloReply> CreateCallHandler(
  245. ServerStreamingServerMethod<JsonTranscodingGreeterService, HelloRequest, HelloReply> invoker,
  246. CallHandlerDescriptorInfo? descriptorInfo = null,
  247. List<(Type Type, object[] Args)>? interceptors = null,
  248. GrpcJsonTranscodingOptions? jsonTranscodingOptions = null,
  249. GrpcServiceOptions? serviceOptions = null)
  250. {
  251. return CreateCallHandler(
  252. invoker,
  253. CreateServiceMethod("TestMethodName", HelloRequest.Parser, HelloReply.Parser),
  254. descriptorInfo,
  255. interceptors,
  256. jsonTranscodingOptions,
  257. serviceOptions);
  258. }
  259. private ServerStreamingServerCallHandler<JsonTranscodingGreeterService, TRequest, TResponse> CreateCallHandler<TRequest, TResponse>(
  260. ServerStreamingServerMethod<JsonTranscodingGreeterService, TRequest, TResponse> invoker,
  261. Method<TRequest, TResponse> method,
  262. CallHandlerDescriptorInfo? descriptorInfo = null,
  263. List<(Type Type, object[] Args)>? interceptors = null,
  264. GrpcJsonTranscodingOptions? jsonTranscodingOptions = null,
  265. GrpcServiceOptions? serviceOptions = null)
  266. where TRequest : class, IMessage<TRequest>
  267. where TResponse : class, IMessage<TResponse>
  268. {
  269. serviceOptions ??= new GrpcServiceOptions();
  270. if (interceptors != null)
  271. {
  272. foreach (var interceptor in interceptors)
  273. {
  274. serviceOptions.Interceptors.Add(interceptor.Type, interceptor.Args ?? Array.Empty<object>());
  275. }
  276. }
  277. var callInvoker = new ServerStreamingServerMethodInvoker<JsonTranscodingGreeterService, TRequest, TResponse>(
  278. invoker,
  279. method,
  280. MethodOptions.Create(new[] { serviceOptions }),
  281. new TestGrpcServiceActivator<JsonTranscodingGreeterService>());
  282. var jsonSettings = jsonTranscodingOptions?.JsonSettings ?? new GrpcJsonSettings() { WriteIndented = false };
  283. var jsonContext = new JsonContext(jsonSettings, jsonTranscodingOptions?.TypeRegistry ?? TypeRegistry.Empty);
  284. return new ServerStreamingServerCallHandler<JsonTranscodingGreeterService, TRequest, TResponse>(
  285. callInvoker,
  286. LoggerFactory,
  287. descriptorInfo ?? TestHelpers.CreateDescriptorInfo(),
  288. JsonConverterHelper.CreateSerializerOptions(jsonContext));
  289. }
  290. public static Marshaller<TMessage> GetMarshaller<TMessage>(MessageParser<TMessage> parser) where TMessage : IMessage<TMessage> =>
  291. Marshallers.Create<TMessage>(r => r.ToByteArray(), data => parser.ParseFrom(data));
  292. public static readonly Method<HelloRequest, HelloReply> ServiceMethod = CreateServiceMethod("MethodName", HelloRequest.Parser, HelloReply.Parser);
  293. public static Method<TRequest, TResponse> CreateServiceMethod<TRequest, TResponse>(string methodName, MessageParser<TRequest> requestParser, MessageParser<TResponse> responseParser)
  294. where TRequest : IMessage<TRequest>
  295. where TResponse : IMessage<TResponse>
  296. {
  297. return new Method<TRequest, TResponse>(MethodType.Unary, "ServiceName", methodName, GetMarshaller(requestParser), GetMarshaller(responseParser));
  298. }
  299. }