Quantcast

Queue, with timeout. (Thread.notify...)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Queue, with timeout. (Thread.notify...)

tim becker-4
Hi,

I'm trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn't support
timeout behaviour, so I went about implementing a queue myself. I
missed java's Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .

I ended up using `thread.join(timeout)` which blocks the current
thread until the thread that `join` was called on finishes (or until
timeout is reached). My queue doesn't need to join any particular
thread, though, it just needs to be notified when a new message is
added to the queue. So I needed to create a `dummy` thread solely to
provide notification. What I came up with is this:

require 'thread'

class MessageQueue
  def initialize
    @lock = Mutex.new
    @t = Thread.new{Thread.stop}  # this thread exists solely for notification
    @queue = []
  end

  def enqueue msg
    @lock.synchronize {
      @queue.push msg
      t=@t
      @t=Thread.new{Thread.stop}
      t.kill# notifiy waiting thread
    }
  end

  def dequeue timeout=nil
    msg = nil
    stop = Time.now+timeout if timeout
    while msg == nil
      @lock.synchronize {msg = @queue.shift}
      break if timeout && Time.new >= stop
      @t.join(timeout) unless msg # wait for new message
    end #while
    msg
  end

end

While this works fine, it seems like a bit of a kludge because I have
to create/misuses threads solely to provided notification to the
blocking thread.

Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?

Thanks,
   -tim

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queue, with timeout. (Thread.notify...)

Avdi Grimm-2
On 5/12/07, Tim Becker <[hidden email]> wrote:
> I'm trying to build a message queue that can receive messages, blocks
> until messages arrive and has the ability to only block for a limited
> amount of time. Unfortunately, the build-in Queue doesn't support
> timeout behaviour, so I went about implementing a queue myself. I
> missed java's Object.wait method, i.e. using a plain object to notify
> the blocking thread to resume .

Use 'timeout':

require 'timeout'
require 'thread'

q = Queue.new

Thread.new do
  q.push "thing"
  sleep 10
end

begin
  Timeout::timeout(1) do
    q.pop
    q.pop
  end
rescue Timeout::Error
  puts "timed out!"
end


--
Avdi

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queue, with timeout. (Thread.notify...)

MenTaLguY
In reply to this post by tim becker-4
On Sat, 2007-05-12 at 22:39 +0900, Tim Becker wrote:
> Any thought? Am I missing an easier/build-in method to block a thread
> only for a limited about of time?

Unless/until the built-in classes support timeouts, using a thread to
track the timeout is sadly your best option.

However, there are a number of problems with the specific approach
you've used here (e.g. not all accesses to @t are protected by the
mutex)... here's an alternate implementation...

        require 'thread'
        begin
          require 'fastthread'
        rescue LoadError
        end
       
        class MessageQueue
          def initialize
            @lock = Mutex.new
            @messages = []
            @readers = []
          end
       
          def enqueue msg
            @lock.synchronize do
              unless @readers.empty?
                @readers.pop << msg
              else
                @messages.push msg
              end
            end
          end
       
          def dequeue timeout=nil
            timeout_thread = nil
            begin
              reader = nil
              @lock.synchronize do
             unless @messages.empty?
               # fast path
               return @messages.shift
             else
                  reader = Queue.new
                  @readers.push reader
                  if timeout
                    timeout_thread = Thread.new do
                      sleep timeout
                      @lock.synchronize do
                        @readers.delete reader
                        reader << nil
                      end
                    end
                  end
                end
              end
              # either timeout or writer will send to us
              reader.shift
            ensure
              # (try to) clean up timeout thread
              timeout_thread.run if timeout_thread
            end
          end
        end

Queue may seem sort of heavyweight to use for "callbacks" this way, but
if you use fastthread they are pretty inexpensive, and queues take care
of a _lot_ of the bookkeeping you would need to do to make this safe
otherwise.

One remaining issue is that it's possible for the timeout thread to
sleep _after_ the call to timeout_thread.run, so the cleanup isn't
necessarily that effective.  However, Thread#kill (or Thread#raise)
isn't an option, since those can leave things in an inconsistent state.

A better solution would be to maintain a dedicated thread for tracking
timeouts which you don't have to worry about cleaning up -- I'll be
releasing a library to do that soon, but for now the above solution is
the best I can do.

-mental

signature.asc (196 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queue, with timeout. (Thread.notify...)

MenTaLguY
In reply to this post by Avdi Grimm-2
On Sun, 2007-05-13 at 04:17 +0900, Avdi Grimm wrote:
> Use 'timeout':

I'd be cautious of using timeout, since not all of stdlib is safe with
respect to asynchronous exceptions (it's sometimes very difficult to do
properly).  I think it's okay with the old thread.rb implementation of
Queue specifically, but I'm not 100% sure about the current C
implementation or other Ruby implementations like JRuby.

-mental

signature.asc (196 bytes) Download Attachment
Loading...