|
Hi,
I'm trying to build a message queue that can receive messages, blocks until messages arrive and has the ability to only block for a limited amount of time. Unfortunately, the build-in Queue doesn't support timeout behaviour, so I went about implementing a queue myself. I missed java's Object.wait method, i.e. using a plain object to notify the blocking thread to resume . I ended up using `thread.join(timeout)` which blocks the current thread until the thread that `join` was called on finishes (or until timeout is reached). My queue doesn't need to join any particular thread, though, it just needs to be notified when a new message is added to the queue. So I needed to create a `dummy` thread solely to provide notification. What I came up with is this: require 'thread' class MessageQueue def initialize @lock = Mutex.new @t = Thread.new{Thread.stop} # this thread exists solely for notification @queue = [] end def enqueue msg @lock.synchronize { @queue.push msg t=@t @t=Thread.new{Thread.stop} t.kill# notifiy waiting thread } end def dequeue timeout=nil msg = nil stop = Time.now+timeout if timeout while msg == nil @lock.synchronize {msg = @queue.shift} break if timeout && Time.new >= stop @t.join(timeout) unless msg # wait for new message end #while msg end end While this works fine, it seems like a bit of a kludge because I have to create/misuses threads solely to provided notification to the blocking thread. Any thought? Am I missing an easier/build-in method to block a thread only for a limited about of time? Thanks, -tim |
|
On 5/12/07, Tim Becker <[hidden email]> wrote:
> I'm trying to build a message queue that can receive messages, blocks > until messages arrive and has the ability to only block for a limited > amount of time. Unfortunately, the build-in Queue doesn't support > timeout behaviour, so I went about implementing a queue myself. I > missed java's Object.wait method, i.e. using a plain object to notify > the blocking thread to resume . Use 'timeout': require 'timeout' require 'thread' q = Queue.new Thread.new do q.push "thing" sleep 10 end begin Timeout::timeout(1) do q.pop q.pop end rescue Timeout::Error puts "timed out!" end -- Avdi |
|
In reply to this post by tim becker-4
On Sat, 2007-05-12 at 22:39 +0900, Tim Becker wrote:
> Any thought? Am I missing an easier/build-in method to block a thread > only for a limited about of time? Unless/until the built-in classes support timeouts, using a thread to track the timeout is sadly your best option. However, there are a number of problems with the specific approach you've used here (e.g. not all accesses to @t are protected by the mutex)... here's an alternate implementation... require 'thread' begin require 'fastthread' rescue LoadError end class MessageQueue def initialize @lock = Mutex.new @messages = [] @readers = [] end def enqueue msg @lock.synchronize do unless @readers.empty? @readers.pop << msg else @messages.push msg end end end def dequeue timeout=nil timeout_thread = nil begin reader = nil @lock.synchronize do unless @messages.empty? # fast path return @messages.shift else reader = Queue.new @readers.push reader if timeout timeout_thread = Thread.new do sleep timeout @lock.synchronize do @readers.delete reader reader << nil end end end end end # either timeout or writer will send to us reader.shift ensure # (try to) clean up timeout thread timeout_thread.run if timeout_thread end end end Queue may seem sort of heavyweight to use for "callbacks" this way, but if you use fastthread they are pretty inexpensive, and queues take care of a _lot_ of the bookkeeping you would need to do to make this safe otherwise. One remaining issue is that it's possible for the timeout thread to sleep _after_ the call to timeout_thread.run, so the cleanup isn't necessarily that effective. However, Thread#kill (or Thread#raise) isn't an option, since those can leave things in an inconsistent state. A better solution would be to maintain a dedicated thread for tracking timeouts which you don't have to worry about cleaning up -- I'll be releasing a library to do that soon, but for now the above solution is the best I can do. -mental |
|
In reply to this post by Avdi Grimm-2
On Sun, 2007-05-13 at 04:17 +0900, Avdi Grimm wrote:
> Use 'timeout': I'd be cautious of using timeout, since not all of stdlib is safe with respect to asynchronous exceptions (it's sometimes very difficult to do properly). I think it's okay with the old thread.rb implementation of Queue specifically, but I'm not 100% sure about the current C implementation or other Ruby implementations like JRuby. -mental |
| Powered by Nabble | Edit this page |
