Re: Pattern for queue + thread pool with in-order processing?
meselfo wrote:
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from
that
client. How do I enforce this requirement?
Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same client
with a sequence number and make worker threads aware of this number
so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective
worker
threads.
Right, you don't want to change how the worker threads execute. You
want to change the order in which messages are pulled off the queue.
There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??
There are two things you need to ensure:
1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.
You get 2 for free, as it's how queues work. All you really need to
do is:
A. Keep data about which clients have messages currently being
processed. The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.
B. Create a queue implementation that skips messages whose clients are
in the set from part A. That is, if messages from C1 and C2, but not
C3, are currently being processed, and the queue looks like
C1.5, C1.6, C2.4, C3.6
the next item returned would be C3.6. Now, if the C1 processing
completes, the next item returned will be C1.5.
The tricky bit here, I think, will be implementing your queue's poll()
method, since it needs to check whether to complete both when
i. a new item is added to the queue, and
ii. message-processing completes (i.e. a client is removed from the
Set), which may make an existing queue item newly eligible to be
returned.