Re: Pattern for queue + thread pool with in-order processing?
 
Mike Schilling wrote:
meselfo wrote:
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to 
worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from
that
client. How do I enforce this requirement?
Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same 
client
with a sequence number and make worker threads aware of this number
so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective
worker
threads.
Right, you don't want to change how the worker threads execute.  You
want to change the order in which messages are pulled off the queue.
There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??
There are two things you need to ensure:
1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.
You get 2 for free, as it's how queues work.  All you really need to
do is:
Here's a simpler version (since it doesn't require implementing a new 
kind of BlockingQueue)
A. Keep data about which clients have messages currently being
processed.  The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.
This changes slightly:
A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed.  The 
simplest thing
here is a Set into which clients areadded when a message is queued and 
removed
when it is done being processed.
B. Create a queue implementation that skips messages whose clients 
are
in the set from part A.  That is, if messages from C1 and C2, but 
not
C3, are currently being processed, and the queue looks like
   C1.5, C1.6, C2.4, C3.6
the next item returned would be C3.6.  Now, if the C1 processing
completes, the next item returned will be C1.5.
Instead of this,
B1. Use a standard BlockingQueue impelmentation for the 
ThreadPoolExecutor,
  but instead of adding new messages to it directly:
B2.  For each client, create a LinkedList of messages to be processed, 
which will
  eventually be moved to the BlockingQueue, and do the following:
    WHEN a message is received from a client
        IF the client is currently in the Set described above
        THEN
            Append that message to that client's LinkedList.
        ELSE
            Append it directly to the Blocking Queue
            Add the client to the Set
    WHEN a client finishes processing a message
        IF the client's LinkedList is empty
        THEN
            Remove the client from the Set
        ELSE
            Move the oldest message from the client's LinkedList to
            the BlockingQueue