参考 Blinker Documentation
Blinker 是一个基于Python的强大的信号库,它既支持简单的对象到对象通信,也支持针对多个对象进行组播。Flask的信号机制就是基于它建立的。
Blinker的内核虽然小巧,但是功能却非常强大,它支持以下特性:
- 支持注册全局命名信号
- 支持匿名信号
- 支持自定义命名信号
- 支持与接收者之间的持久连接与短暂连接
- 通过弱引用实现与接收者之间的自动断开连接
- 支持发送任意大小的数据
- 支持收集信号接收者的返回值
- 线程安全
创建信号
信号通过signal()
方法进行创建:
1 2 3 4 |
>>> from blinker import signal >>> initialized = signal("initialized") >>> initialized is signal("initialized") True |
每次调用signal('name')
都会返回同一个信号对象。因此这里signal()
方法使用了单例模式。
订阅信号
使用Signal.connect()
方法注册一个函数,每当触发信号的时候,就会调用该函数。该函数以触发信号的对象作为参数,这个函数其实就是信号订阅者。
1 2 3 4 5 6 |
>>> def subscriber(sender): ... print("Got a signal sent by %r" % sender) ... >>> ready = signal('ready') >>> ready.connect(subscriber) <function subscriber at 0x...> |
触发信号
使用Signal.send()
方法通知信号订阅者。
下面定义类Processor
,在它的go()
方法中触发前面声明的ready
信号,send()
方法以self
为参数,也就是说Processor
的实例是信号的发送者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
>>> class Processor: ... def __init__(self, name): ... self.name = name ... ... def go(self): ... ready = signal('ready') ... ready.send(self) ... print("Processing.") ... complete = signal('complete') ... complete.send(self) ... ... def __repr__(self): ... return '<Processor %s>' % self.name ... >>> processor_a = Processor('a') >>> processor_a.go() Got a signal sent by Processing. |
注意到go()
方法中的complete
信号没?并没有订阅者订阅该信号,但是依然可以触发该信号。如果没有任何订阅者的信号,结果是什么信号也不会发送,而且Blinker
内部对这种情况进行了优化,以尽可能的减少内存开销。
订阅特定的发布者
默认情况下,任意发布者触发信号,都会通知订阅者。可以给Signal.connect()
传递一个可选的参数,以便限制订阅者只能订阅特定发送者。
1 2 3 4 5 6 7 |
>>> def b_subscriber(sender): ... print("Caught signal from processor_b.") ... assert sender.name == 'b' ... >>> processor_b = Processor('b') >>> ready.connect(b_subscriber, sender=processor_b) <function b_subscriber at 0x...> |
现在订阅者只订阅了processor_b
发布的ready
信号:
1 2 3 4 5 6 7 |
>>> processor_a.go() Got a signal sent by Processing. >>> processor_b.go() Got a signal sent by Caught signal from processor_b. Processing. |
通过信号收发数据
可以给send()
方法传递额外的关键字参数,这些参数会传递给订阅者。