class Object
Constants
- COLNAMES
- COLTYPES
- DAY
- Multicast
convenience alias for Sink::Multicast (this is not defined when requiring just coroutines/base)
Public Instance Methods
Evaluates to the next input value from the associated consumption context. In order to create a consumption context, you have to use #consum_for or #trans_for on the method calling await.
The behavior of await is undefined if the method using it is called without being wrapped by #consum_for or trans_for. It should be an error to do so, as it is an error to use yield without an associated block; however, since await is not a language primitive (like yield), it is not always possible to enforce this. This is likely to change in a future version if a better solution can be found.
# File lib/coroutines.rb, line 121 def await yielder = Fiber.current.instance_variable_get(:@yielder) raise CoroutineError, "you can't call a consumer" if yielder.nil? yielder.await end
# File examples/parse_apache.rb, line 42 def bytes_transferred total = 0 loop do total += await["bytes"] yield total end end
Creates a new Consumer coroutine from the given
method. This is analogous to using Kernel#enum_for to create an Enumerator instance. The method is called
immediately (with the given args
). It executes until the first
call to await, at which point #consum_for returns the Consumer instance. Calling consumer << obj
resumes the consumer at the point where it last executed await, which evaluates to
obj
.
Calling consumer.close raises StopIteration at the point where the consumer last executed await; it is expected that it will terminate without executing await again. Consumer#close evaluates to the return value of the method.
Example:
def counter(start) result = start loop { result += await } "Final value: #{result}" end co = consum_for :counter, 10 #=> #<Consumer: main:counter (running)> co << 10 << 1000 << 10000 co.close #=> "Final value: 11020"
# File lib/coroutines.rb, line 156 def consum_for(meth, *args) cons = Consumer.new do |y| Fiber.current.instance_variable_set(:@yielder, y) send(meth, *args) end description = "#{inspect}:#{meth}" cons.define_singleton_method :inspect do state = if @fiber.alive? then "running" else @result.inspect end "#<Consumer: #{description} (#{state})>" end cons end
# File examples/parse_apache.rb, line 64 def dump(label) loop do puts "#{label}: #{await}" end end
Reads log entry Hashes; for each log entry with status 404, yields
# File examples/parse_apache.rb, line 33 def find_404 loop do r = await if r["status"] == 404 yield %w{status datetime request}.map{|col| r[col].to_s}.join(" ") end end end
# File examples/parse_apache.rb, line 70 def follow(path) file = open(path) loop do begin yield file.readline rescue EOFError sleep 0.1 end end file.close end
Reads log lines in the following format: host referrer user [timestamp] “GET|POST request protocol” status bytes … For each parsable line, yields a Hash with the keys given by COLNAMES and the values parsed according to COLTYPES.
# File examples/parse_apache.rb, line 21 def parse loop do if await =~ /(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (\S+) (\S+)" (\S+) (\S+)/ log_entry = $~.captures.zip(COLNAMES, COLTYPES). map{|val, name, type| [name, val.send(type)] }. out_connect(Hash.new) yield log_entry end end end
Transformer that accepts lines of text and
-
appends any lines starting with a space to the previous line
-
parses each set of non-empty concatenated lines as a key:value pair and yields that pair as an Array
# File examples/dpkgstatus-bench.rb, line 11 def parse_dpkg_status line = await loop do accum = "" begin begin accum += line line = await end while line.start_with? " " ensure key, value = accum.split(":", 2) yield [key.strip, value.strip] if accum != "\n" end end end
# File examples/parse_apache.rb, line 51 def request_rate last = await["datetime"] loop do n = 0 begin this = await["datetime"] n += 1 end while this == last yield n.to_f / (this - last).to_f / DAY last = this end end
Creates a new Transformer instance that
wraps the given method. This is analogous to using Kernel#enum_for to
create an Enumerator instance, or using #consum_for to create a Consumer instance. The method is not executed
immediately. The resulting Transformer can
be connected to an enumerable (using transformer.in_connect(enum)) or to a
sink (using transformer.out_connect(sink)). The point at which the
transformer method gets started depends on how it is connected; in any case
however, the method will be called with the args
given to
trans_for. See Transformer for details.
Within the transformer method, await can be used to read the next input value (as in a Consumer, compare #consum_for), and yield can be used to produce an output value (i.e., when it is called, the method is given a block which accepts its output).
Example:
def running_sum(start) result = start loop { result += await; yield result } end tr = trans_for :running_sum, 3 #=> #<Transformer: main:running_sum> sums = (1..10).out_connect(tr) #=> #<Enumerator: #<Transformer: main:running_sum> <= 1..10> sums.to_a #=> [4, 6, 9, 13, 18, 24, 31, 39, 48, 58]
# File lib/coroutines.rb, line 198 def trans_for(meth, *args) trans = Transformer.new do |y| Fiber.current.instance_variable_set(:@yielder, y) send(meth, *args, &y.method(:yield)) end description ="#{inspect}:#{meth}" trans.define_singleton_method :inspect do "#<Transformer: #{description}>" end trans end
Example shamelessly stolen from eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines/
# File examples/frames.rb, line 6 def unwrap(header=0x61, footer=0x62, escape=0xAB) loop do byte = await next if byte != header frame = "" loop do byte = await case byte when footer yield frame break when escape frame += await.to_s(16) else frame += byte.to_s(16) end end end end