This blog post will go into more details about one of the fundamentals behind the WIB, namely the message and the message queues.
A WIB message is an interfaced based, reference counted class instance, which contains a number of properties specifying the contents itself, security/authentication data and a subject header which provides access to specifying the quality of service (QoS) to be adhered to when operating that message, routing and tracking information.
Hence, in terms of a snail mail type letter, the message contains the letter itself (the contents), what is written on the envelope, like sender and recipient information (the routing information), the postal stamp and additional manifest papers stating service level of delivery (the QoS), and the tracking service for that letter, embedded directly into it.
WIB messaging transports acts as postal offices, and deals only with messages, and usually do so only via message queues, which acts as storage bins in the post office.
If you have data contents you wish to send, it is easy to do so by using the SendMessage or SendPrioritizedMessage methods of the WIB transports. Behind the scenes kbmMW then generates the relevant message instance with the provided values and push them on the outbound queue that is attached to the transport.
There exists different types of messages which are used for different purposes. This blog post mostly focus on the use of messages of type mwmtMessage, which is the one you, as a developer, usually will use:
TkbmMWMessageType = (mwmtUnknown,mwmtRequest,mwmtResponse,mwmtServiceCall, mwmtMessage,mwmtSubscribe,mwmtUnsubscribe,mwmtCache,mwmtThrottle,mwmtEvent,mwmtHandshake);
SendMessage and SendPrioritizedMessage above will always send messages of type mwmtMessage, which is the message type for all sorts of user data. You usually do not specify the message type manually, but use the WIB to generate the relevant message for you.
var m:IkbmMWCustomMessageTransportStream; ... begin m:=transport.CreateMessage(Subject,Target,ClientIdent,Priority,UserStream,Args,Options); ... queue.PushMessage(m); end;
In the above, the message type generated is also mwmtMessage.
There are methods available for generating the message types that are needed in the situations they are needed. For example can you generate mwmtEvent type messages using the CreateEvent or CreatePrioritizedEvent methods of the transport.
If you really want to get advanced and create any supported message type, you can use the CreateTransportStream function. It takes a message type and an interface based information instance.
The message itself is basically portable between any node having client or server transports, but the information instance contains transport specific information which is following the message for as long as it exists in context of that particular transport.
For example if you know the message will be pushed onto a messaging client transport, you will create the info object as a TkbmMWCustomMessagingClientTransportInfo instance. If it is for pushing via a server side transport, you will create the info object as a TkbmMWCustomMessagingServerTransportInfo instance.
The two different types of info instances have different features because a server node knows how to process requests and return responses for many client nodes simultaneously, while the client node knows how to make requests and optionally wait for a response.
var m:IkbmMWCustomMessageTransportStream; info:IkbmMWMessagingTransportInfo; ... info:=TkbmMWCustomMessagingServerTransportInfo.Create; info.Subject:=Subject; m:=transport.CreateTransportStream(mwmtSubscribe,info); ...
As mentioned, this is purely for information for very advanced users and is not something you will need to know anything about to use the WIB for 99.9% of all messaging you can think of.
The message queue intro
Each WIB (messaging) transport typically requires two message queues to be attached to it. One that handles messages that are inbound, and one for messages that are outbound.
You can see the message queue as an intermediate storage of the message before it is processed. Since it is indeed a storage, there can be different requirements for that storage.
It could be a temporary storage… meaning that if someone stumbles on the power cord and the computer holding the queues dies, the messages in the temporary queues will have been lost, and the only way to “get it back” is to recreate the messages from whatever data you may have elsewhere. The TkbmMWMemoryMessageQueue is one such temporary message storage.
Or it could be a resilient storage, meaning one that generally will not loose already stored messages, even if the power goes off. The TkbmMWFileStoreMessageQueue is a resilient message storage. Hence after a crash, you will usually be able to get up and running again with no or only the last message being lost. However you will be shown, that you may trade some resiliency for performance since the file based message queue contains various caching features which you can tune.
Messages are pushed (placed into the queue) and popped off (taken off the queue).
Since message queues are following the FIFO (First In First Out) principle, it means a message first pushed is the message that is first popped.
Pushing a message onto a queue can be either normal or tentatively via the PushMessage method. A tentative push means that the message is indeed put on the queue, but it is not yet visible and thus will not show up as being on the queue while it is still tentative.
This is something that is used by a couple of advanced features in the WIB:
- Grouping – Messages can be grouped together in the queue and only be released for processing and further sending when all relevant messages for the group has been made ready on the queue. The messages are still processed one by one when they are no longer marked tentative.
- Batching – Messages of same priority, subject and QoS can be automatically batched together to one single message. This can improve performance in some situations where huge amounts of identical messages are sent (the user data can be different).
Usually messages are pushed non tentatively.
Popping messages from a queue always happens in two steps. First one use TentativeMessagePop which returns the oldest message in the queue (although respecting priorities and ignoring tentatively pushed messages). When the message has been processed nicely, for example by doing something with it or pushing it into another queue, one should call CommitMessagePop which tells the queue that the message has been handled correctly and it can be removed from the queue. If something went wrong during the processing, one should call RollbackMessagePop, which tells the queue that something bad happened, and the message is left on the queue.
If enough rollbacks happen for a message, then a stall situation may occur. See later.
A message queue can be processed by one of the many message processors that the WIB provides. All messaging transports internally use a message processor for each of the inbound and outbound queues.
The message processor basically pops messages off the queue in various ways, and provides events which can be used for message processing and message processing failure handling.
A message processor can process one or more messages depending on the chosen processing mode:
- mpoAll – Process all messages. When done, finish.
- mpoBlockAll – Wait for a message, then process all. When done, finish.
- mpoOne – Process one message. When done, finish.
- mpoBlockOne – Wait for a message, then process one. When done, finish.
- mpoContinous – Process all messages until Terminate property is set to true.
There currently exists these generic message processors:
- TkbmMWSyncMessageQueueProcessor – Process messages in the same thread that is calling the Process method.
- TkbmMWAsyncMessageQueueProcessor – Process messages in a separate thread than the thread calling the Process method.
- TkbmMWMultithreadMessageQueueProcessor – Process messages in a number of separate threads concurrently.
- TkbmMWGroupedMultithreadMessageQueueProcessor – Process messages in a number of separate threads concurrently, and obey message grouping requests.
And a number of specialised transport oriented delivery and inbound message queue processors, which all descends from the above, but have specific handling mechanisms.
Usually the FIFO message order is obeyed, however only for as long as the messages has the same priority. kbmMW’s message queues supports prioritisation of messages, such that higher prioritised messages may jump the queue to get in front of other lower prioritised messages already in the queue.
A message can have one of 256 priorities. The default priority is 128 (KBMMW_MESSAGEPRIORITY_NORMAL), highest priority is 0 (KBMMW_MESSAGEPRIORITY_HIGHEST) and lowest priority is 255 (KBMMW_MESSAGEPRIORITY_LOWEST).
Messages of type mwmtSubscribe and mwmtUnsubscribe are normally pushed to a queue given the default priority of KBMMW_MESSAGEPRIORITY_SUBSCRIPTION (which is KBMMW_MESSAGEPRIORITY_NORMAL minus 10).
Subscription messages are usually produced when a node is announcing its subscriptions to other nodes, to tell them what it is interested in receiving. Since it may not receive any data at all, if its announcement has not gone thru for example due to high load on the queue, it may never receive the data it wants. For that reason subscription relevant messages are given higher priority than standard data messages.
To send messages with a specific priority, either use one of the SendPrioritizedxxx methods of the transport, or CreateMessage with the relevant priority as argument.
Message queues can be limited in how many messages can be put into them before they detect a congestion. Default there are no congestion checking, but it is of importance in a production system, where the speed of popping off messages of the queues can not be guaranteed to be faster than the speed of pushing new messages onto the queue.
If congestion management is not enabled, you will at some point end up with out of memory, or a very slow system with extremely delayed messages.
Congestion is checked at the time a new message is pushed to the queue.
First the property CongestionThresholdLow is checked if set >0. If so, and the new messages priority is same or lower (higher number) than CongestionPriorityLow and the number of messages in the queue is larger than CongestionThresholdLow, then a congestion has happened.
Alternatively if no congestion has been detected, the same test is done against CongestionPriority and CongestionTreshold (if >0) and its threshold value
Congestion management is enabled by setting a couple of properties on the message queue component.
- CongestionPriorityLow – (Default KBMMW_MESSAGEPRIORITY_LOW) Primary congestion priority test. If a new message has higher priority than this, it is not checked for congestion.
- CongestionThresholdLow – (Default 0 = no checking). The max amount of messages allowed in queue when checking against CongestionPriorityLow.
- CongestionPriority – (Default KBMMW_MESSAGEPRIORITY_VERYHIGH) Secondary congestion priority test. If a new message has higher priority than this, it is not checked for congestion.
- CongestionThreshold – (Default 0 = no checking). The max amount of messages allowed in queue when checking against CongestionPriority.
- CongestionDelay – If Options declare that push delays should occur upon congestion, this is the delay in msecs. Default 500.
- Options – A number of options for how to handle various situations on the queue. These options are interesting in relation to congestion management:
- mwmqoRejectOnCongestion – If congestion is detected, the new message is instead pushed to the message queue referenced by the message queue property RejectQueue. PushMessage will return true.
- mwmqoDeleteOnCongestion – If congestion is detected, the new message is deleted. PushMessage will return true.
- mwmqoDelayOnCongestion – If congestion is detected, a delay will happen before the message is pushed on the queue. PushMessage will return true.
- mwmqoFailOnCongestion – If congestion is detected, PushMessage will return false.
- mwmqoPurgeOnCongestion – If congestion is detected, enough messages of same or lower priority are purged from the queue so the new message can be pushed on it. If a push can happen PushMessage will return true else false.
- mwmqoPurgeAllPrioritiesOnCongestion – If congestion is detected, enough messages of all priorities (starting with lowest first) are purged from the queue so the new message can be pushed on it. If a push can happen PushMessage will return true else false.
- mwmqoExcludeTentativelyPushedFromCongestion – Do not count tentatively pushed messages in congestion count.
If something in an application fails to process a message, the message will usually automatically be left on the queue to be retried in a moment. However if there is a permanent problem (programming bug or some resource has run out), the message will constantly be tentatively popped of the queue, and rolled back to the queue again, thus potentially stalling the queue.
For that reason the queues support stall management, which basically describes, how many times a message can be rolled back to the queue before stall management is happening, and what to do with the stalled message.
These properties on the queue component are interesting for stall management:
- StalledThreshold – If >0 (default 0) then if a message is rolled back StalledThreshold times, stall management is activated
- Options – The following options are interesting for stall management:
- mwmqoDeleteStalledMessage – The stalled message is deleted from the queue.
- mwmqoRejectStalledMessage – The stalled message is rejected from the queue and pushed on the RejectQueue.
- StallQueue – If set, the stalled message is pushed onto the StallQueue.
Message states and lifetime
kbmMW’s messages are reference counted, which means they can be put into multiple queues at the same time, but only count once memory wise. The moment no one references the message it automatically dies and is removed from memory.
A message goes thru several states in its life from raw data to packed message and back to raw data.
It is important to know (for advanced users) that a message has a current state, and since the message can be shared between multiple queues, that state is also shared.
Each message goes thru the following packing states (when a message is being prepared by kbmMW for streaming):
TkbmMWTransportStreamState = (mwtssNone, mwtssEnvelopeStart, mwtssMessageHeader, mwtssHeader, mwtssBody, mwtssBatchedMessages, mwtssEnvelopeEnd, mwtssPacked, mwtssSigned, mwtssStreamReady);
It initially have the state mwtssNone, but will automatically at some point, progress thru the states until it reaches mwtssStreamReady, after which the message can be sent.
What happens during the states, is that various property values of the message, is “baked” into a streamable set of bytes. The more advanced the state is, the more properties has been baked in, and thus changing the properties will not be reflected in the produced set of bytes, unless the state is reset back to mwtssNone, at which point the backing goes on from scratch again.
The current stream state can be queried by the property StreamState on the message.
“Unbaking” or unstreaming a message means converting a set of bytes back into data that can be accessed via the properties of the message.
TkbmMWTransportUnstreamState = (mwtsusNone, mwtsusStreamReady, mwtsusVerified, mwtsusUnPacked, mwtsusMessageHeaderPropertiesAvailable, mwtsusHeaderPropertiesAvailable, mwtsusBodyPropertiesAvailable, mtwsusBatchedMessagesAvailable, mwtsusAllPropertiesAvailable);
A newly received message will typically start out with its unstream state being mwtsusStreamReady, and progress towards mwtsusAllPropertiesAvailable at the point where you as the developer can use its data.
The current unstream state can be queried via the property UnstreamState of the message.
However if you are making a message gateway or routing mechanism, where the current node may not really be interested in the data itself, you will often only have the message unstreamed to perhaps the mwtsusMessageHeaderPropertiesAvailable state. In some situations you may not even want to get it to that state as some very basic subject header information are made available for you without having to unstreaming the data.
You can access the still not unstreamed message in the OnRawMessage event of the transport.
If you were to ask for it in the OnMessage event, it will have been unstreamed to the mwtsusAllPropertiesAvailable state. which means you can access all user data provided in the message.
You can force a specific stream or unstream state by using the EnsureUnstreamState(…) and EnsureStreamState(…) methods of the message itself, but it is usually not required unless you either explicitly work with raw messages, or you are pushing the same message instance on different queues, while changing data between the pushes.
I would generally recommend not doing that unless there is a very specific reason. Instead clone the message and push the altered clone to the other queue.
Quality of Service (QoS)
Each message is assigned a set of QoS flags, which governs how the message is to be treated by transports and queues, and how it may affect other messages already on the queue.
QoS is typically set upon message creation or by setting the byte property SubjectHeader.QoS of the message BEFORE pushing the message to a queue.
Default the value of QoS is 0, but the following values can be OR’ed together:
- KBMMW_TRANSPORT_QOS_CONFIRMED_DELIVERY =$01 – Require confirmed deliver. The message pop is not committed until a positive transport delivery to another node has been confirmed.
- KBMMW_TRANSPORT_QOS_BATCHABLE =$02 – The message can be batched with other messages of matching subject header.
- KBMMW_TRANSPORT_QOS_PRIMARY_IF_BATCHED =$04 – This message should be the first message if it is being batched with other messages.
- KBMMW_TRANSPORT_QOS_REPLACE_BY_SUBJECT =$08 – Upon pushing this message on a queue, remove all other messages in the queue that has same subject.
- KBMMW_TRANSPORT_QOS_REPLACE_BY_PRIORITY =$10 – Upon pushing this message on a queue, remove all other messages in the queue that has same priority.
- KBMMW_TRANSPORT_QOS_SHOULD_PERSIST =$80 – This message should be persisted. Designed for queue hotels, but not currently in use.
Is managed automatically for messages that require confirmed delivery. Each WIB transport has an InboundAckHandler and an OutboundAckHandler property, thru which statistics and configuration of message acknowledgement can be tracked.
The OutboundAckHandler keeps track of messages that have been sent, but not yet acknowledged by the other end. If a message has not been acknowledged by a certain time (Unacked.AutoRollbackTimeout) measured in secs (default 30), the message is rolled back on the source queue, and will be resend again later. If the amount of messages waiting for being acknowledged reaches a certain threshold (Unacked.Threshold – default 5000), it can delay sending additional messages that have a QoS that require guaranteed delivery. (Unacked.Delay) measured in msecs (default 500) controls the delay.
The InboundAckHandler handles received messages and makes sure to acknowledge receiving them at a regular pace, either when default 1000 (controlled by the property Unacked.UnAckedTreshold) messages that require acknowledgement has been received and not yet acknowledged, or every second whatever comes first.
Messages can be grouped in such way that they are processed together by the queue processor, regardless of if the messages has been pushed onto the queue with some delay between them, and with other non grouped messages in between. The messages belong to the group will be held back until specifically indicated that the group is to be released for processing.
Grouping (and other future features) can be defined in the string property message.SubjectHeader.Distribution
Default it is empty, but it can currently take values like this:
- G:name – Push message on queue under group name and process message sequential via group.
- G:name/H(:60) – Push message on queue under group name and hold message in that group. Do not reject held messages until group been idle for more than 60 secs.
- G:name/C – Push message on queue under group name and commit all held messages in that group sequentially.
- G:name/R – Push message on queue under group name and rollback (delete) this and all held messages in that group.
where name is the name of the group. When using G:name/H the message will enter that particular group, and will be held back until either a message with G:name/C or G.name/R comes along. If /C then all messages within that given group are processed and sent on. If /R, then all messages within that given group that are still held back, will be rolled back.
The /H option allows for a :nn value which indicates the number of seconds that the messages max will be held back before being auto rejected and rolled back.
If G without option is given, the message is not held back, but sent in order with other messages of the same group name.
2,993 total views, 1 views today