Re: Pattern for queue + thread pool with in-order processing?
Mike Schilling wrote:
meselfo wrote:
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to
worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from
that
client. How do I enforce this requirement?
Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same
client
with a sequence number and make worker threads aware of this number
so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective
worker
threads.
Right, you don't want to change how the worker threads execute. You
want to change the order in which messages are pulled off the queue.
There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??
There are two things you need to ensure:
1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.
You get 2 for free, as it's how queues work. All you really need to
do is:
Here's a simpler version (since it doesn't require implementing a new
kind of BlockingQueue)
A. Keep data about which clients have messages currently being
processed. The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.
This changes slightly:
A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed. The
simplest thing
here is a Set into which clients areadded when a message is queued and
removed
when it is done being processed.
B. Create a queue implementation that skips messages whose clients
are
in the set from part A. That is, if messages from C1 and C2, but
not
C3, are currently being processed, and the queue looks like
C1.5, C1.6, C2.4, C3.6
the next item returned would be C3.6. Now, if the C1 processing
completes, the next item returned will be C1.5.
Instead of this,
B1. Use a standard BlockingQueue impelmentation for the
ThreadPoolExecutor,
but instead of adding new messages to it directly:
B2. For each client, create a LinkedList of messages to be processed,
which will
eventually be moved to the BlockingQueue, and do the following:
WHEN a message is received from a client
IF the client is currently in the Set described above
THEN
Append that message to that client's LinkedList.
ELSE
Append it directly to the Blocking Queue
Add the client to the Set
WHEN a client finishes processing a message
IF the client's LinkedList is empty
THEN
Remove the client from the Set
ELSE
Move the oldest message from the client's LinkedList to
the BlockingQueue