grpc.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package eventsearcher
  2. import (
  3. "context"
  4. "time"
  5. "github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher/proto"
  6. )
  7. const (
  8. rpcTimeout = 20 * time.Second
  9. )
  10. // GRPCClient is an implementation of Notifier interface that talks over RPC.
  11. type GRPCClient struct {
  12. client proto.SearcherClient
  13. }
  14. // SearchFsEvents implements the Searcher interface
  15. func (c *GRPCClient) SearchFsEvents(searchFilters *FsEventSearch) ([]byte, []string, []string, error) {
  16. ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
  17. defer cancel()
  18. resp, err := c.client.SearchFsEvents(ctx, &proto.FsEventsFilter{
  19. StartTimestamp: searchFilters.StartTimestamp,
  20. EndTimestamp: searchFilters.EndTimestamp,
  21. Actions: searchFilters.Actions,
  22. Username: searchFilters.Username,
  23. Ip: searchFilters.IP,
  24. SshCmd: searchFilters.SSHCmd,
  25. Protocols: searchFilters.Protocols,
  26. InstanceIds: searchFilters.InstanceIDs,
  27. Statuses: searchFilters.Statuses,
  28. Limit: int32(searchFilters.Limit),
  29. ExcludeIds: searchFilters.ExcludeIDs,
  30. FsProvider: int32(searchFilters.FsProvider),
  31. Bucket: searchFilters.Bucket,
  32. Endpoint: searchFilters.Endpoint,
  33. Order: proto.FsEventsFilter_Order(searchFilters.Order),
  34. })
  35. if err != nil {
  36. return nil, nil, nil, err
  37. }
  38. return resp.Data, resp.SameTsAtStart, resp.SameTsAtEnd, nil
  39. }
  40. // SearchProviderEvents implements the Searcher interface
  41. func (c *GRPCClient) SearchProviderEvents(searchFilters *ProviderEventSearch) ([]byte, []string, []string, error) {
  42. ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
  43. defer cancel()
  44. resp, err := c.client.SearchProviderEvents(ctx, &proto.ProviderEventsFilter{
  45. StartTimestamp: searchFilters.StartTimestamp,
  46. EndTimestamp: searchFilters.EndTimestamp,
  47. Actions: searchFilters.Actions,
  48. Username: searchFilters.Username,
  49. Ip: searchFilters.IP,
  50. ObjectTypes: searchFilters.ObjectTypes,
  51. ObjectName: searchFilters.ObjectName,
  52. InstanceIds: searchFilters.InstanceIDs,
  53. Limit: int32(searchFilters.Limit),
  54. ExcludeIds: searchFilters.ExcludeIDs,
  55. Order: proto.ProviderEventsFilter_Order(searchFilters.Order),
  56. })
  57. if err != nil {
  58. return nil, nil, nil, err
  59. }
  60. return resp.Data, resp.SameTsAtStart, resp.SameTsAtEnd, nil
  61. }
  62. // GRPCServer defines the gRPC server that GRPCClient talks to.
  63. type GRPCServer struct {
  64. Impl Searcher
  65. }
  66. // SearchFsEvents implements the server side fs events search method
  67. func (s *GRPCServer) SearchFsEvents(ctx context.Context, req *proto.FsEventsFilter) (*proto.SearchResponse, error) {
  68. responseData, sameTsAtStart, sameTsAtEnd, err := s.Impl.SearchFsEvents(&FsEventSearch{
  69. CommonSearchParams: CommonSearchParams{
  70. StartTimestamp: req.StartTimestamp,
  71. EndTimestamp: req.EndTimestamp,
  72. Actions: req.Actions,
  73. Username: req.Username,
  74. IP: req.Ip,
  75. InstanceIDs: req.InstanceIds,
  76. Limit: int(req.Limit),
  77. ExcludeIDs: req.ExcludeIds,
  78. Order: int(req.Order),
  79. },
  80. SSHCmd: req.SshCmd,
  81. Protocols: req.Protocols,
  82. Statuses: req.Statuses,
  83. FsProvider: int(req.FsProvider),
  84. Bucket: req.Bucket,
  85. Endpoint: req.Endpoint,
  86. })
  87. return &proto.SearchResponse{
  88. Data: responseData,
  89. SameTsAtStart: sameTsAtStart,
  90. SameTsAtEnd: sameTsAtEnd,
  91. }, err
  92. }
  93. // SearchProviderEvents implement the server side provider events search method
  94. func (s *GRPCServer) SearchProviderEvents(ctx context.Context, req *proto.ProviderEventsFilter) (*proto.SearchResponse, error) {
  95. responseData, sameTsAtStart, sameTsAtEnd, err := s.Impl.SearchProviderEvents(&ProviderEventSearch{
  96. CommonSearchParams: CommonSearchParams{
  97. StartTimestamp: req.StartTimestamp,
  98. EndTimestamp: req.EndTimestamp,
  99. Actions: req.Actions,
  100. Username: req.Username,
  101. IP: req.Ip,
  102. InstanceIDs: req.InstanceIds,
  103. Limit: int(req.Limit),
  104. ExcludeIDs: req.ExcludeIds,
  105. Order: int(req.Order),
  106. },
  107. ObjectTypes: req.ObjectTypes,
  108. ObjectName: req.ObjectName,
  109. })
  110. return &proto.SearchResponse{
  111. Data: responseData,
  112. SameTsAtStart: sameTsAtStart,
  113. SameTsAtEnd: sameTsAtEnd,
  114. }, err
  115. }