Pixabay

Preface

I was asked the legitimate question… how the heck do I actually use a TkbmMWMultithreadMessageQueueProcessor. This post provides an easy to follow example.

Since message queue processors are a significant part of the WIB (Wide Information Bus), it is recommended that you also read the previous WIB blog posts: WIB #1 and WIB #2 to get the overall understanding of what the WIB actually is.

Example

Create a simple VCL project in Delphi.

Add a TkbmMWMemoryMessageQueue and a TkbmMWMultithreadMessageQueueProcessor to the form. You can obviously also create those at runtime instead.

I have added a couple of buttons for “Start”, “Stop” and “Generate random messages”.

The purpose of the Start button is to setup the message queue processor, and start it to run continuously. The Stop button stops the processor’s processing, and the generate random messages button create a random number of messages with semirandom subjects, that we can use to show the processing of the message processor.

Further I have added an event handler to the OnProcess event of the message queue processor.

The form looks like this:

And the code for the unit looks like this:

unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, 
  System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs,
  kbmMWGlobal,
  kbmMWCustomTransport,
  kbmMWCustomMessagingTransport, Vcl.ExtCtrls, Vcl.StdCtrls;

type
  TForm1 = class(TForm)
    kbmMWMultithreadMessageQueueProcessor1: TkbmMWMultithreadMessageQueueProcessor;
    kbmMWMemoryMessageQueue1: TkbmMWMemoryMessageQueue;
    Button1: TButton;
    Button2: TButton;
    Button3: TButton;
    procedure kbmMWMultithreadMessageQueueProcessor1Process(
      AProcessor: TkbmMWCustomMessageProcessor;
      AMessage: IkbmMWCustomTransportStream;
      var AAction: TkbmMWMessageProcessingSuccessAction);
    procedure Button1Click(Sender: TObject);
    procedure Button2Click(Sender: TObject);
    procedure Button3Click(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.Button1Click(Sender: TObject);
begin
     if kbmMWMultithreadMessageQueueProcessor1.Processing then
        exit;
     kbmMWMultithreadMessageQueueProcessor1.Queue:=kbmMWMemoryMessageQueue1;
     kbmMWMultithreadMessageQueueProcessor1.PoolSize:=10;
     kbmMWMultithreadMessageQueueProcessor1.Operation:=mpoContinous;
     kbmMWMultithreadMessageQueueProcessor1.Start;
end;

procedure TForm1.Button2Click(Sender: TObject);
begin
     kbmMWMultithreadMessageQueueProcessor1.Stop;
end;

procedure TForm1.Button3Click(Sender: TObject);
var
   i:integer;
   msg:IkbmMWStandardMessageTransportStream;
begin
     for i:=0 to random(100)*10 do
     begin
          msg:=TkbmMWStandardMessageTransportStream.Create(nil,nil);
          msg.SubjectHeader.Subject:='SOME.HEADER.'
                      +inttostr(i)+'.'+inttostr(Random(100));
          kbmMWMemoryMessageQueue1.PushMessage(msg);
     end;
end;

procedure TForm1.kbmMWMultithreadMessageQueueProcessor1Process(
  AProcessor: TkbmMWCustomMessageProcessor;
  AMessage: IkbmMWCustomTransportStream;
  var AAction: TkbmMWMessageProcessingSuccessAction);
begin
     OutputDebugString(PChar('Processing message: '
                  +AMessage.SubjectHeader.Subject
                  +', thread:'+inttostr(TkbmMWCustomThread.CurrentThreadID)));
     AAction:=mwpsaCommit; // Message processed. Permanently take it off the queue.
end;

end.

The setup of the message queue processor is pretty simple. The queue it will operate on, is set, and its operation is defined to be continuous. Poolsize is set to 10 which limits the number of threads that can process messages from the queue to 10 concurrent threads.

Finally the Start method is called. Start will immediately start to process the contents of the queue. If Operation had been set to something else than continuous, it would only process one, or all messages in the queue (depending on operation), and then stop running. Setting it to continuous will allow it to continuously check if there are new messages in the queue to process.

Since the multi threaded message queue processor is an asynchronous processor, Start will return immediately, but leave the queue processing running in the background.

The stop button simply calls the stop method of the processor, which will stop the processing and shut down threads.

The event handler for the message queue processor will be called for each message being processed. As 10 messages may be processed concurrently, the handler needs to be thread safe. Hence updating the GUI is a no no without proper synchronization, and doing synchronization would effectively result in the multithreaded operation being stalled by the GUI updates, and essentially run only a single thread at a time which negates the benefit of the multithreaded nature of this queue processor.

If you need to keep track of “how far” it has come (ie. count processed messages and display that in the GUI), then atomically increment a shared variable, and using a TTimer, present the value of that variable in the GUI.

Or you may choose to use another “blocking” queue processor that runs in the main thread and only return from Start when the operation is done.

An important aspect of the event handler is returning a status to the message queue processor, that indicates if the message was successfully processed or if it failed or if the message was rejected of some reason or if it should be retried.

In our case we return the commit status, which means that the message, that was tentatively popped from the queue, was handled successfully and must now completely be removed from the queue.

/Kim/C4D

Loading

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.