This blog post will go into more details about one of the fundamentals behind the WIB, namely the message and the message queues.
A WIB message is an interfaced based, reference counted class instance, which contains a number of properties specifying the contents itself, security/authentication data and a subject header which provides access to specifying the quality of service (QoS) to be adhered to when operating that message, routing and tracking information.
Hence, in terms of a snail mail type letter, the message contains the letter itself (the contents), what is written on the envelope, like sender and recipient information (the routing information), the postal stamp and additional manifest papers stating service level of delivery (the QoS), and the tracking service for that letter, embedded directly into it.
WIB messaging transports acts as postal offices, and deals only with messages, and usually do so only via message queues, which acts as storage bins in the post office.
If you have data contents you wish to send, it is easy to do so by using the SendMessage or SendPrioritizedMessage methods of the WIB transports. Behind the scenes kbmMW then generates the relevant message instance with the provided values and push them on the outbound queue that is attached to the transport.
There exists different types of messages which are used for different purposes. This blog post mostly focus on the use of messages of type mwmtMessage, which is the one you, as a developer, usually will use:
TkbmMWMessageType = (mwmtUnknown,mwmtRequest,mwmtResponse,mwmtServiceCall, mwmtMessage,mwmtSubscribe,mwmtUnsubscribe,mwmtCache,mwmtThrottle,mwmtEvent,mwmtHandshake);
SendMessage and SendPrioritizedMessage above will always send messages of type mwmtMessage, which is the message type for all sorts of user data. You usually do not specify the message type manually, but use the WIB to generate the relevant message for you.
var m:IkbmMWCustomMessageTransportStream; ... begin m:=transport.CreateMessage(Subject,Target,ClientIdent,Priority,UserStream,Args,Options); ... queue.PushMessage(m); end;
In the above, the message type generated is also mwmtMessage.
There are methods available for generating the message types that are needed in the situations they are needed. For example can you generate mwmtEvent type messages using the CreateEvent or CreatePrioritizedEvent methods of the transport.
If you really want to get advanced and create any supported message type, you can use the CreateTransportStream function. It takes a message type and an interface based information instance.
The message itself is basically portable between any node having client or server transports, but the information instance contains transport specific information which is following the message for as long as it exists in context of that particular transport.
For example if you know the message will be pushed onto a messaging client transport, you will create the info object as a TkbmMWCustomMessagingClientTransportInfo instance. If it is for pushing via a server side transport, you will create the info object as a TkbmMWCustomMessagingServerTransportInfo instance.
The two different types of info instances have different features because a server node knows how to process requests and return responses for many client nodes simultaneously, while the client node knows how to make requests and optionally wait for a response.
var m:IkbmMWCustomMessageTransportStream; info:IkbmMWMessagingTransportInfo; ... info:=TkbmMWCustomMessagingServerTransportInfo.Create; info.Subject:=Subject; m:=transport.CreateTransportStream(mwmtSubscribe,info); ...
As mentioned, this is purely for information for very advanced users and is not something you will need to know anything about to use the WIB for 99.9% of all messaging you can think of.
The message queue intro
Each WIB (messaging) transport typically requires two message queues to be attached to it. One that handles messages that are inbound, and one for messages that are outbound.
You can see the message queue as an intermediate storage of the message before it is processed. Since it is indeed a storage, there can be different requirements for that storage.
It could be a temporary storage… meaning that if someone stumbles on the power cord and the computer holding the queues dies, the messages in the temporary queues will have been lost, and the only way to “get it back” is to recreate the messages from whatever data you may have elsewhere. The TkbmMWMemoryMessageQueue is one such temporary message storage.
Or it could be a resilient storage, meaning one that generally will not loose already stored messages, even if the power goes off. The TkbmMWFileStoreMessageQueue is a resilient message storage. Hence after a crash, you will usually be able to get up and running again with no or only the last message being lost. However you will be shown, that you may trade some resiliency for performance since the file based message queue contains various caching features which you can tune.
Messages are pushed (placed into the queue) and popped off (taken off the queue).
Since message queues are following the FIFO (First In First Out) principle, it means a message first pushed is the message that is first popped.
Pushing a message onto a queue can be either normal or tentatively via the PushMessage method. A tentative push means that the message is indeed put on the queue, but it is not yet visible and thus will not show up as being on the queue while it is still tentative.
This is something that is used by a couple of advanced features in the WIB:
- Grouping – Messages can be grouped together in the queue and only be released for processing and further sending when all relevant messages for the group has been made ready on the queue. The messages are still processed one by one when they are no longer marked tentative.
- Batching – Messages of same priority, subject and QoS can be automatically batched together to one single message. This can improve performance in some situations where huge amounts of identical messages are sent (the user data can be different).
Usually messages are pushed non tentatively.
Popping messages from a queue always happens in two steps. First one use TentativeMessagePop which returns the oldest message in the queue (although respecting priorities and ignoring tentatively pushed messages). When the message has been processed nicely, for example by doing something with it or pushing it into another queue, one should call CommitMessagePop which tells the queue that the message has been handled correctly and it can be removed from the queue. If something went wrong during the processing, one should call RollbackMessagePop, which tells the queue that something bad happened, and the message is left on the queue.
If enough rollbacks happen for a message, then a stall situation may occur. See later.
A message queue can be processed by one of the many message processors that the WIB provides. All messaging transports internally use a message processor for each of the inbound and outbound queues.
The message processor basically pops messages off the queue in various ways, and provides events which can be used for message processing and message processing failure handling.
A message processor can process one or more messages depending on the chosen processing mode:
- mpoAll – Process all messages. When done, finish.
- mpoBlockAll – Wait for a message, then process all. When done, finish.
- mpoOne – Process one message. When done, finish.
- mpoBlockOne – Wait for a message, then process one. When done, finish.
- mpoContinous – Process all messages until Terminate property is set to true.
There currently exists these generic message processors:
- TkbmMWSyncMessageQueueProcessor – Process messages in the same thread that is calling the Process method.
- TkbmMWAsyncMessageQueueProcessor – Process messages in a separate thread than the thread calling the Process method.
- TkbmMWMultithreadMessageQueueProcessor – Process messages in a number of separate threads concurrently.
- TkbmMWGroupedMultithreadMessageQueueProcessor – Process messages in a number of separate threads concurrently, and obey message grouping requests.
And a number of specialised transport oriented delivery and inbound message queue processors, which all descends from the above, but have specific handling mechanisms.
Usually the FIFO message order is obeyed, however only for as long as the messages has the same priority. kbmMW’s message queues supports prioritisation of messages, such that higher prioritised messages may jump the queue to get in front of other lower prioritised messages already in the queue.
A message can have one of 256 priorities. The default priority is 128 (KBMMW_MESSAGEPRIORITY_NORMAL), highest priority is 0 (KBMMW_MESSAGEPRIORITY_HIGHEST) and lowest priority is 255 (KBMMW_MESSAGEPRIORITY_LOWEST).
Messages of type mwmtSubscribe and mwmtUnsubscribe are normally pushed to a queue given the default priority of KBMMW_MESSAGEPRIORITY_SUBSCRIPTION (which is KBMMW_MESSAGEPRIORITY_NORMAL minus 10).
Subscription messages are usually produced when a node is announcing its subscriptions to other nodes, to tell them what it is interested in receiving. Since it may not receive any data at all, if its announcement has not gone thru for example due to high load on the queue, it may never receive the data it wants. For that reason subscription relevant messages are given higher priority than standard data messages.
To send messages with a specific priority, either use one of the SendPrioritizedxxx methods of the transport, or CreateMessage with the relevant priority as argument.
Message queues can be limited in how many messages can be put into them before they detect a congestion. Default there are no congestion checking, but it is of importance in a production system, where the speed of popping off messages of the queues can not be guaranteed to be faster than the speed of pushing new messages onto the queue.
If congestion management is not enabled, you will at some point end up with out of memory, or a very slow system with extremely delayed messages.
Congestion is checked at the time a new message is pushed to the queue.
First the property CongestionThresholdLow is checked if set >0. If so, and the new messages priority is same or lower (higher number) than CongestionPriorityLow and the number of messages in the queue is larger than CongestionThresholdLow, then a congestion has happened.
Alternatively if no congestion has been detected, the same test is done against CongestionPriority and CongestionTreshold (if >0) and its threshold value
Congestion management is enabled by setting a couple of properties on the message queue component.
- CongestionPriorityLow – (Default KBMMW_MESSAGEPRIORITY_LOW) Primary congestion priority test. If a new message has higher priority than this, it is not checked for congestion.
- CongestionThresholdLow – (Default 0 = no checking). The max amount of messages allowed in queue when checking against CongestionPriorityLow.
- CongestionPriority – (Default KBMMW_MESSAGEPRIORITY_VERYHIGH) Secondary congestion priority test. If a new message has higher priority than this, it is not checked for congestion.
- CongestionThreshold – (Default 0 = no checking). The max amount of messages allowed in queue when checking against CongestionPriority.
- CongestionDelay – If Options declare that push delays should occur upon congestion, this is the delay in msecs. Default 500.
- Options – A number of options for how to handle various situations on the queue. These options are interesting in relation to congestion management:
- mwmqoRejectOnCongestion – If congestion is detected, the new message is instead pushed to the message queue referenced by the message queue property RejectQueue. PushMessage will return true.
- mwmqoDeleteOnCongestion – If congestion is detected, the new message is deleted. PushMessage will return true.
- mwmqoDelayOnCongestion – If congestion is detected, a delay will happen before the message is pushed on the queue. PushMessage will return true.
- mwmqoFailOnCongestion – If congestion is detected, PushMessage will return false.
- mwmqoPurgeOnCongestion – If congestion is detected, enough messages of same or lower priority are purged from the queue so the new message can be pushed on it. If a push can happen PushMessage will return true else false.
- mwmqoPurgeAllPrioritiesOnCongestion – If congestion is detected, enough messages of all priorities (starting with lowest first) are purged from the queue so the new message can be pushed on it. If a push can happen PushMessage will return true else false.
- mwmqoExcludeTentativelyPushedFromCongestion – Do not count tentatively pushed messages in congestion count.
If something in an application fails to process a message, the message will usually automatically be left on the queue to be retried in a moment. However if there is a permanent problem (programming bug or some resource has run out), the message will constantly be tentatively popped of the queue, and rolled back to the queue again, thus potentially stalling the queue.
For that reason the queues support stall management, which basically describes, how many times a message can be rolled back to the queue before stall management is happening, and what to do with the stalled message.
These properties on the queue component are interesting for stall management:
- StalledThreshold – If >0 (default 0) then if a message is rolled back StalledThreshold times, stall management is activated
- Options – The following options are interesting for stall management:
- mwmqoDeleteStalledMessage – The stalled message is deleted from the queue.
- mwmqoRejectStalledMessage – The stalled message is rejected from the queue and pushed on the RejectQueue.
- StallQueue – If set, the stalled message is pushed onto the StallQueue.
Message states and lifetime
kbmMW’s messages are reference counted, which means they can be put into multiple queues at the same time, but only count once memory wise. The moment no one references the message it automatically dies and is removed from memory.
A message goes thru several states in its life from raw data to packed message and back to raw data.
It is important to know (for advanced users) that a message has a current state, and since the message can be shared between multiple queues, that state is also shared.
Each message goes thru the following packing states (when a message is being prepared by kbmMW for streaming):
TkbmMWTransportStreamState = (mwtssNone, mwtssEnvelopeStart, mwtssMessageHeader, mwtssHeader, mwtssBody, mwtssBatchedMessages, mwtssEnvelopeEnd, mwtssPacked, mwtssSigned, mwtssStreamReady);
It initially have the state mwtssNone, but will automatically at some point, progress thru the states until it reaches mwtssStreamReady, after which the message can be sent.
What happens during the states, is that various property values of the message, is “baked” into a streamable set of bytes. The more advanced the state is, the more properties has been baked in, and thus changing the properties will not be reflected in the produced set of bytes, unless the state is reset back to mwtssNone, at which point the backing goes on from scratch again.
The current stream state can be queried by the property StreamState on the message.
“Unbaking” or unstreaming a message means converting a set of bytes back into data that can be accessed via the properties of the message.
TkbmMWTransportUnstreamState = (mwtsusNone, mwtsusStreamReady, mwtsusVerified, mwtsusUnPacked, mwtsusMessageHeaderPropertiesAvailable, mwtsusHeaderPropertiesAvailable, mwtsusBodyPropertiesAvailable, mtwsusBatchedMessagesAvailable, mwtsusAllPropertiesAvailable);
A newly received message will typically start out with its unstream state being mwtsusStreamReady, and progress towards mwtsusAllPropertiesAvailable at the point where you as the developer can use its data.
The current unstream state can be queried via the property UnstreamState of the message.
However if you are making a message gateway or routing mechanism, where the current node may not really be interested in the data itself, you will often only have the message unstreamed to perhaps the mwtsusMessageHeaderPropertiesAvailable state. In some situations you may not even want to get it to that state as some very basic subject header information are made available for you without having to unstreaming the data.
You can access the still not unstreamed message in the OnRawMessage event of the transport.
If you were to ask for it in the OnMessage event, it will have been unstreamed to the mwtsusAllPropertiesAvailable state. which means you can access all user data provided in the message.
You can force a specific stream or unstream state by using the EnsureUnstreamState(…) and EnsureStreamState(…) methods of the message itself, but it is usually not required unless you either explicitly work with raw messages, or you are pushing the same message instance on different queues, while changing data between the pushes.
I would generally recommend not doing that unless there is a very specific reason. Instead clone the message and push the altered clone to the other queue.
Quality of Service (QoS)
Each message is assigned a set of QoS flags, which governs how the message is to be treated by transports and queues, and how it may affect other messages already on the queue.
QoS is typically set upon message creation or by setting the byte property SubjectHeader.QoS of the message BEFORE pushing the message to a queue.
Default the value of QoS is 0, but the following values can be OR’ed together:
- KBMMW_TRANSPORT_QOS_CONFIRMED_DELIVERY =$01 – Require confirmed deliver. The message pop is not committed until a positive transport delivery to another node has been confirmed.
- KBMMW_TRANSPORT_QOS_BATCHABLE =$02 – The message can be batched with other messages of matching subject header.
- KBMMW_TRANSPORT_QOS_PRIMARY_IF_BATCHED =$04 – This message should be the first message if it is being batched with other messages.
- KBMMW_TRANSPORT_QOS_REPLACE_BY_SUBJECT =$08 – Upon pushing this message on a queue, remove all other messages in the queue that has same subject.
- KBMMW_TRANSPORT_QOS_REPLACE_BY_PRIORITY =$10 – Upon pushing this message on a queue, remove all other messages in the queue that has same priority.
- KBMMW_TRANSPORT_QOS_SHOULD_PERSIST =$80 – This message should be persisted. Designed for queue hotels, but not currently in use.
Is managed automatically for messages that require confirmed delivery. Each WIB transport has an InboundAckHandler and an OutboundAckHandler property, thru which statistics and configuration of message acknowledgement can be tracked.
The OutboundAckHandler keeps track of messages that have been sent, but not yet acknowledged by the other end. If a message has not been acknowledged by a certain time (Unacked.AutoRollbackTimeout) measured in secs (default 30), the message is rolled back on the source queue, and will be resend again later. If the amount of messages waiting for being acknowledged reaches a certain threshold (Unacked.Threshold – default 5000), it can delay sending additional messages that have a QoS that require guaranteed delivery. (Unacked.Delay) measured in msecs (default 500) controls the delay.
The InboundAckHandler handles received messages and makes sure to acknowledge receiving them at a regular pace, either when default 1000 (controlled by the property Unacked.UnAckedTreshold) messages that require acknowledgement has been received and not yet acknowledged, or every second whatever comes first.
Messages can be grouped in such way that they are processed together by the queue processor, regardless of if the messages has been pushed onto the queue with some delay between them, and with other non grouped messages in between. The messages belong to the group will be held back until specifically indicated that the group is to be released for processing.
Grouping (and other future features) can be defined in the string property message.SubjectHeader.Distribution
Default it is empty, but it can currently take values like this:
- G:name – Push message on queue under group name and process message sequential via group.
- G:name/H(:60) – Push message on queue under group name and hold message in that group. Do not reject held messages until group been idle for more than 60 secs.
- G:name/C – Push message on queue under group name and commit all held messages in that group sequentially.
- G:name/R – Push message on queue under group name and rollback (delete) this and all held messages in that group.
where name is the name of the group. When using G:name/H the message will enter that particular group, and will be held back until either a message with G:name/C or G.name/R comes along. If /C then all messages within that given group are processed and sent on. If /R, then all messages within that given group that are still held back, will be rolled back.
The /H option allows for a :nn value which indicates the number of seconds that the messages max will be held back before being auto rejected and rolled back.
If G without option is given, the message is not held back, but sent in order with other messages of the same group name.
3,772 total views, 1 views today
3 thoughts on “kbmMW WIB #2 – The Wide Information Bus – Messages and message queues”
Long time customer (~2002). I’ve asked about this before a few times, and something always comes up that pulls me away;
I’m wanting to use one open port/ip address and combine the WIB and standard message processing. I looked at the demo that is in the samples/demos folder, and it doesn’t have much depth. Can you create a demo with some depth, say a service that has 2 or 3 RPC services and handles WIB on the sever, then on the client, call the RPC services and publish subscribe a message from a client form, and on the same form call one of the services that returns a dataset?
I’m not wrapping my head around how I can selectively subscribe to a service (or publish a subscription on the client, say a chat app for example), and then also send and RPC request. I guess I’m wondering how to get a message from the server to a specific form. Server side, pick off the requests, route the messages to the message handlers/queues, and then route the RPC correctly (preferably not in a single thread, so the requests don’t become a bottle neck).
How about Security? How do you secure the WIB? Is just using SSL going to do it? I thought I read somewhere that due to the routing, the message have to be clear text? Maybe an example of securing messaging?
Long time kbmMW customer
I am working on a screen cast + source code showing a multitier WIB driven rolodex which should incorporate most of the questions you have.
However to answer your questions.
First, if you choose to use the WIB, you can make request/response and async push operations concurrently on the same transport and port. You will however be limited to using the protocols that are available that are WIB compatible (that generally boils down to the binary protocol).
If you would like to provide a REST server too, accessible from web browsers, you would need an additional transport (with additional port) which handles the REST or AJAX protocol, since it operates in a different way compared to the WIB.
Basically all message routing is based on subscriptions. Your WIB transport would (in the simplest fashion) subscribe for > which means that it subscribes for all types of messages from anybody to anybody. This is an OK starting point, but the subscriptions definitely must be dressed more detailed up before your multi tier system goes into production (or even user test).
You will often use a hub/spoke transport topology, where your traditional application server will act as a hub node, and the traditional clients as spoke nodes.
Hence the clients connects as usual to the server (in a similar way as you are used to).
The clients will each of them usually announce the subjects they subscribe on, to the hub, and the hub will only deliver messages to spokes that are subscribing for those messages. But it also means that a message may be delivered to a number of spokes, if multiple spokes subscriptions match with the subject of that message.
A request is simply a message. It differs by having a subject starting with REQ.something. Hence the server must be able to receive messages that have subjects starting with REQ to be able to process them. With the initial simple subscription for > (which is a catch all subscription), the server will get all REQ messages sent by any client node.
When a server has processed a REQ message, it will reply with a RES message. The reply will go out with an address (as part of the subject) that includes the specific node’s node ID that originally made the request. For that reason, nodes should only subscribe for REQ.xxxxx subjects that contain their own node id. AutoSetupResponseSubscription is a simple property to set on the client transport to set to true before connecting. Then the relevant RES..xxxx subscription is setup on the client node.
A client may do the regular request with a blocking wait for the response, or it may ask for the response to be async, in which case an event or an anonymous method is called.
Other messages can be sent async by simply using SendMessage fron any node.
Did you get time to complete your screen cast with the RoloDex demo? I didn’t mention this in my original email, but I’d like to have the RPC stay RPC and the Messaging stay Messaging, meaning I don’t want the messaging to contain datasets that are currently passed back and forth using RPC. Obviously this is a long running project, I’ve got a few projects I’m juggling now, and will be back on this one for this request again soon.