MassTransit, Saga and RabbitMQ for implementing a process manager

Once we faced the task of automating various workflows in a large company. For us, this meant putting together at the time of launch about 10 systems. And it was necessary to connect everything asynchronously, scalable, reliable.


Simplifiedly, the process can be described as a sequence of actions in different systems that cannot be fully automated, since it requires human participation. For example, to select specific actions or elementary reconciliation that is necessary to proceed to the next stage of the process.


To solve this problem, we decided to use the messaging architecture via the data bus, and MassTransit with its Saga in conjunction with RabbitMQ perfectly suited us.


image

What is Saga?


Saga is an implementation of the Process Manager pattern from the book Enterprise Application Integration Patterns , which allows you to describe the process as a finite state machine. At the entrance arrives some event, Saga performs a sequence of actions. However, at any stage of the Saga, a person’s decision may be required. Then she creates a task in the tracker and “falls asleep” for an indefinite time, waiting for new events.

Saga is based on Automatonymous . It is declaratively described in a class inherited from MassTransitStateMachine <>. For Saga, it is necessary to describe all the states, the events being taken and the actions performed when certain events occur. The current state is stored in the database.


To begin with, we describe all the states and events of Saga and give them clear names. It looks like this:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

We have got a partial class, where we declare a list of all states and events, and the BuildStateMachine method, which describes the correlation of events with Saga. To do this, in each event, a special parameter CorrelationId is transmitted - this is Guid, which runs between all connected systems and monitoring systems.


Thus, in the event of any problems, we can restore the whole picture of what is happening on the logs of all related systems. We send the CorrelationId in messages from Saga, and the systems send it back to the notifications so that we can relate the message to a specific Saga.


And here is an example of the state machine itself:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

The constructor describes the states. Reception of each event is taken out in a separate method to preserve readability. The entire logic of constructing messages is taken into the messages themselves, otherwise, with the increasing complexity of the system, Saga swells up quite quickly.


It is necessary to carefully consider the development of conventions and the preservation of readability. Because of the imperativeness of C #, it is very difficult to declare a description of states and actions. Even for unpretentious machines of states, real hell begins.


Now a few words about SagaInstance. SagaInstance is a class inherited from SagaStateMachineInstance. It consists of objects and fields that characterize the state machine. Roughly speaking, this is the memory of Saga. We store there all the Saga data that she will need throughout her life. Also in this class is described the logic of changes in these data in the course of work.


Let's give an example:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

The example shows that SagaInstance maintains a CorrelationId for correlating events with Saga and CurrentState for storing the current state of Saga.


Error processing


What happens to Saga if an error occurs during message processing? This is an important question, since everyone wants the state machine to always remain consistent, even if something went wrong. And in Saga from MassTransit, this is all good.


As you have already noticed, in the examples above there is not a single try catch block for exception handling. The reason is simple: they are not needed there. If an exception occurs during message processing, the message is returned to the queue, and all changes will be rolled back. Since we do all the data manipulations in the same transaction as Saga, the transaction will not be closed.


In general, the manipulation of something other than Saga in Saga itself is bad practice. According to the book "Patterns of integration of corporate applications," the process manager should remain as "thin and stupid" as possible: just distribute commands to the systems and monitor the state, while he himself should not do anything.


Of course, there are more complex scenarios when you need to perform some compensating actions to handle exceptions. Then the “.Catch” state machine handler is used to intercept an exception of a certain type and further execute the compensating logic.


And if you just need to secure an exception, then it is better to use the Observer.


Now imagine a situation that we have already executed the Send command during the processing of the message, after which an exception was thrown. What will be with the command sent at this step? After all, everything that flew away, you will not return? But here everything is thought out.


When configuring the bus, you can enable the UseInMemoryOutbox option. This option allows you to not send messages until the current step is completed. If an exception occurs, the messages will not be sent at all. Here is an excerpt from the documentation:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

Tests


At first glance, testing an asynchronous state machine is still a pleasure. But everything is fine here. MassTransit provides a good test writing framework that fully satisfies all our needs for testing the state machine.


The framework provides the InMemory data bus implementation (InMemoryTestHarness), which allows you to send and receive messages, bypassing RabbitMQ or another queue.


Well, as an example:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

Tests are performed fairly quickly. For example, on one developer’s computer, 850 tests are performed for approximately 21 seconds.


Useful tips


In conclusion, we provide a list of useful tips based on our experience.


  1. It is best to place the contracts and communication schemes via bus into a private nuget. So you will not have differences in the naming on the sending and receiving parties. You can also put constants with the names of queues and hosts in nuget. Nuget is configured in a day. And also some source controls support nuget, there are paid private feeds.


  2. Understand the differences between Send and Publish. Use Send if you have one subscriber and you know exactly the name of the queue to which you are sending the command. Publish is designed to send broadcast alerts. Details on the link .


  3. If you need to build a Request / Response message, it is better to add the queue name for the response to the contract than to use the Request / Response scheme from MassTransit, which MassTransit proposes to avoid. Since this greatly reduces reliability. You lose all the benefits of asynchrony. But if you still need to get an answer in a limited time, it is better to use a direct call. The best thing about this is written in the same book, Enterprise Application Integration Patterns.


  4. Saga should be fine. Try to carry all the heavy logic into other systems. And Saga must ride through the states and scatter messages left and right.


  5. Add to all messages a CorrelationId that will run between systems. It is much easier then to analyze the logs and link all messages into a single picture. Also comes Masstransit itself. CorrelationId is added to messages when inheriting from the CorrelatedBy interface.


    Set up logs and monitoring in your systems, it never hurts. Our experience in this article .

Source: https://habr.com/ru/post/412793/


All Articles