diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index e178e239e7..3a9197ce87 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -250,15 +250,23 @@ class ZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.transp 'message from master').format(len(messages_len))) return self._decode_payload(payload) + @property + def stream(self): + if not hasattr(self, '_stream'): + self._stream = zmq.eventloop.zmqstream.ZMQStream(self._socket, io_loop=self.io_loop) + return self._stream + def on_recv(self, callback): ''' Register a callback for recieved messages (that we didn't initiate) ''' - self.stream = zmq.eventloop.zmqstream.ZMQStream(self._socket, io_loop=self.io_loop) + if callback is None: + return self.stream.on_recv(None) + def wrap_callback(messages): payload = self._decode_messages(messages) callback(payload) - self.stream.on_recv(wrap_callback) + return self.stream.on_recv(wrap_callback) def recv(self, timeout=0): '''