| Home | Trees | Indices | Help |
|---|
|
|
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 from __future__ import absolute_import
21
22 import errno
23 import logging
24 import socket
25 import time
26 import weakref
27
28 from ._condition import Condition
29 from ._delivery import Delivery
30 from ._endpoints import Endpoint
31 from ._events import Event, Handler, _dispatch
32 from ._exceptions import ProtonException
33 from ._io import IO
34 from ._message import Message
35 from ._selectable import Selectable
36 from ._transport import Transport
37 from ._url import Url
38
39 log = logging.getLogger("proton")
43 """
44 A utility for simpler and more intuitive handling of delivery
45 events related to outgoing i.e. sent messages.
46 """
47
51
53 if event.link.is_sender and event.link.credit \
54 and event.link.state & Endpoint.LOCAL_ACTIVE \
55 and event.link.state & Endpoint.REMOTE_ACTIVE:
56 self.on_sendable(event)
57
59 dlv = event.delivery
60 if dlv.link.is_sender and dlv.updated:
61 if dlv.remote_state == Delivery.ACCEPTED:
62 self.on_accepted(event)
63 elif dlv.remote_state == Delivery.REJECTED:
64 self.on_rejected(event)
65 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
66 self.on_released(event)
67 if dlv.settled:
68 self.on_settled(event)
69 if self.auto_settle:
70 dlv.settle()
71
73 """
74 Called when the sender link has credit and messages can
75 therefore be transferred.
76 """
77 if self.delegate is not None:
78 _dispatch(self.delegate, 'on_sendable', event)
79
81 """
82 Called when the remote peer accepts an outgoing message.
83 """
84 if self.delegate is not None:
85 _dispatch(self.delegate, 'on_accepted', event)
86
88 """
89 Called when the remote peer rejects an outgoing message.
90 """
91 if self.delegate is not None:
92 _dispatch(self.delegate, 'on_rejected', event)
93
95 """
96 Called when the remote peer releases an outgoing message. Note
97 that this may be in response to either the RELEASE or MODIFIED
98 state as defined by the AMQP specification.
99 """
100 if self.delegate is not None:
101 _dispatch(self.delegate, 'on_released', event)
102
111
114 msg = Message()
115 msg.decode(delivery.link.recv(delivery.pending))
116 delivery.link.advance()
117 return msg
118
125
132
136 """
137 Accepts a received message.
138
139 Note that this method cannot currently be used in combination
140 with transactions.
141 """
142 self.settle(delivery, Delivery.ACCEPTED)
143
145 """
146 Rejects a received message that is considered invalid or
147 unprocessable.
148 """
149 self.settle(delivery, Delivery.REJECTED)
150
152 """
153 Releases a received message, making it available at the source
154 for any (other) interested receiver. The ``delivered``
155 parameter indicates whether this should be considered a
156 delivery attempt (and the delivery count updated) or not.
157 """
158 if delivered:
159 self.settle(delivery, Delivery.MODIFIED)
160 else:
161 self.settle(delivery, Delivery.RELEASED)
162
167
170 """
171 A utility for simpler and more intuitive handling of delivery
172 events related to incoming i.e. received messages.
173 """
174
178
180 dlv = event.delivery
181 if not dlv.link.is_receiver: return
182 if dlv.aborted:
183 self.on_aborted(event)
184 dlv.settle()
185 elif dlv.readable and not dlv.partial:
186 event.message = recv_msg(dlv)
187 if event.link.state & Endpoint.LOCAL_CLOSED:
188 if self.auto_accept:
189 dlv.update(Delivery.RELEASED)
190 dlv.settle()
191 else:
192 try:
193 self.on_message(event)
194 if self.auto_accept:
195 dlv.update(Delivery.ACCEPTED)
196 dlv.settle()
197 except Reject:
198 dlv.update(Delivery.REJECTED)
199 dlv.settle()
200 except Release:
201 dlv.update(Delivery.MODIFIED)
202 dlv.settle()
203 elif dlv.updated and dlv.settled:
204 self.on_settled(event)
205
207 """
208 Called when a message is received. The message itself can be
209 obtained as a property on the event. For the purpose of
210 referring to this message in further actions (e.g. if
211 explicitly accepting it, the ``delivery`` should be used, also
212 obtainable via a property on the event.
213 """
214 if self.delegate is not None:
215 _dispatch(self.delegate, 'on_message', event)
216
220
224
227 """
228 A utility that exposes 'endpoint' events i.e. the open/close for
229 links, sessions and connections in a more intuitive manner. A
230 XXX_opened method will be called when both local and remote peers
231 have opened the link, session or connection. This can be used to
232 confirm a locally initiated action for example. A XXX_opening
233 method will be called when the remote peer has requested an open
234 that was not initiated locally. By default this will simply open
235 locally, which then triggers the XXX_opened call. The same applies
236 to close.
237 """
238
242
243 @classmethod
246
247 @classmethod
250
251 @classmethod
254
255 @classmethod
258
259 @classmethod
262
263 @classmethod
265 if endpoint.remote_condition:
266 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
267 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
268 log.error("%s closed by peer" % endpoint_type)
269
271 if event.link.remote_condition:
272 self.on_link_error(event)
273 elif self.is_local_closed(event.link):
274 self.on_link_closed(event)
275 else:
276 self.on_link_closing(event)
277 event.link.close()
278
280 if event.session.remote_condition:
281 self.on_session_error(event)
282 elif self.is_local_closed(event.session):
283 self.on_session_closed(event)
284 else:
285 self.on_session_closing(event)
286 event.session.close()
287
289 if event.connection.remote_condition:
290 if event.connection.remote_condition.name == "amqp:connection:forced":
291 # Treat this the same as just having the transport closed by the peer without
292 # sending any events. Allow reconnection to happen transparently.
293 return
294 self.on_connection_error(event)
295 elif self.is_local_closed(event.connection):
296 self.on_connection_closed(event)
297 else:
298 self.on_connection_closing(event)
299 event.connection.close()
300
304
306 if self.is_local_open(event.connection):
307 self.on_connection_opened(event)
308 elif self.is_local_uninitialised(event.connection):
309 self.on_connection_opening(event)
310 event.connection.open()
311
315
317 if self.is_local_open(event.session):
318 self.on_session_opened(event)
319 elif self.is_local_uninitialised(event.session):
320 self.on_session_opening(event)
321 event.session.open()
322
326
328 if self.is_local_open(event.link):
329 self.on_link_opened(event)
330 elif self.is_local_uninitialised(event.link):
331 self.on_link_opening(event)
332 event.link.open()
333
337
341
345
349
353
357
359 if self.delegate is not None:
360 _dispatch(self.delegate, 'on_connection_error', event)
361 else:
362 self.log_error(event.connection, "connection")
363
365 if self.delegate is not None:
366 _dispatch(self.delegate, 'on_session_error', event)
367 else:
368 self.log_error(event.session, "session")
369 event.connection.close()
370
372 if self.delegate is not None:
373 _dispatch(self.delegate, 'on_link_error', event)
374 else:
375 self.log_error(event.link, "link")
376 event.connection.close()
377
381
385
389
391 if self.delegate is not None:
392 _dispatch(self.delegate, 'on_connection_closing', event)
393 elif self.peer_close_is_error:
394 self.on_connection_error(event)
395
397 if self.delegate is not None:
398 _dispatch(self.delegate, 'on_session_closing', event)
399 elif self.peer_close_is_error:
400 self.on_session_error(event)
401
403 if self.delegate is not None:
404 _dispatch(self.delegate, 'on_link_closing', event)
405 elif self.peer_close_is_error:
406 self.on_link_error(event)
407
409 self.on_transport_closed(event)
410
412 if self.delegate is not None and event.connection and self.is_local_open(event.connection):
413 _dispatch(self.delegate, 'on_disconnected', event)
414
417 """
418 A general purpose handler that makes the proton-c events somewhat
419 simpler to deal with and/or avoids repetitive tasks for common use
420 cases.
421 """
422
423 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
424 self.handlers = []
425 if prefetch:
426 self.handlers.append(FlowController(prefetch))
427 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
428 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
429 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
430 self.fatal_conditions = ["amqp:unauthorized-access"]
431
433 """
434 Called when some error is encountered with the transport over
435 which the AMQP connection is to be established. This includes
436 authentication errors as well as socket errors.
437 """
438 if event.transport.condition:
439 if event.transport.condition.info:
440 log.error("%s: %s: %s" % (
441 event.transport.condition.name, event.transport.condition.description,
442 event.transport.condition.info))
443 else:
444 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
445 if event.transport.condition.name in self.fatal_conditions:
446 event.connection.close()
447 else:
448 logging.error("Unspecified transport error")
449
451 """
452 Called when the peer closes the connection with an error condition.
453 """
454 EndpointStateHandler.print_error(event.connection, "connection")
455
457 """
458 Called when the peer closes the session with an error condition.
459 """
460 EndpointStateHandler.print_error(event.session, "session")
461 event.connection.close()
462
464 """
465 Called when the peer closes the link with an error condition.
466 """
467 EndpointStateHandler.print_error(event.link, "link")
468 event.connection.close()
469
471 """
472 Called when the event loop - the reactor - starts.
473 """
474 if hasattr(event.reactor, 'subclass'):
475 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
476 self.on_start(event)
477
483
489
495
501
507
513
519
525
527 """
528 Called when the sender link has credit and messages can
529 therefore be transferred.
530 """
531 pass
532
538
544
546 """
547 Called when the remote peer releases an outgoing message. Note
548 that this may be in response to either the RELEASE or MODIFIED
549 state as defined by the AMQP specification.
550 """
551 pass
552
554 """
555 Called when the remote peer has settled the outgoing
556 message. This is the point at which it should never be
557 retransmitted.
558 """
559 pass
560
562 """
563 Called when a message is received. The message itself can be
564 obtained as a property on the event. For the purpose of
565 referring to this message in further actions (e.g. if
566 explicitly accepting it, the ``delivery`` should be used, also
567 obtainable via a property on the event.
568 """
569 pass
570
573 """
574 The interface for transaction handlers, i.e. objects that want to
575 be notified of state changes related to a transaction.
576 """
577
580
583
586
589
592
595 """
596 An extension to the MessagingHandler for applications using
597 transactions.
598 """
599
600 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
601 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
602
604 if transaction:
605 transaction.accept(delivery)
606 else:
607 super(TransactionalClientHandler, self).accept(delivery)
608
614
616 self._flow(event.link)
617
619 self._flow(event.link)
620
622 self._flow(event.link)
623
625 self._flow(event.link)
626
633
636
637 @staticmethod
642
643 @staticmethod
648
649 @staticmethod
651 link = event.link
652 if link.state & Endpoint.LOCAL_UNINIT:
653 link.source.copy(link.remote_source)
654 link.target.copy(link.remote_target)
655 link.open()
656
657 @staticmethod
662
663 @staticmethod
668
669 @staticmethod
674
675
676 # Back compatibility definitions
677 CFlowController = FlowController
678 CHandshaker = Handshaker
682
686
688 event.dispatch(self.delegate)
689
692
695
701
703 reactor = event.reactor
704 # check if we are still quiesced, other handlers of
705 # on_reactor_quiesced could have produced events to process
706 if not reactor.quiesced: return
707
708 reading = []
709 writing = []
710 deadline = None
711 for sel in self.selectables:
712 if sel.reading:
713 reading.append(sel)
714 if sel.writing:
715 writing.append(sel)
716 if sel.deadline:
717 if deadline is None:
718 deadline = sel.deadline
719 else:
720 deadline = min(sel.deadline, deadline)
721
722 if deadline is not None:
723 timeout = deadline - time.time()
724 else:
725 timeout = reactor.timeout
726 if timeout < 0: timeout = 0
727 timeout = min(timeout, reactor.timeout)
728 readable, writable, _ = IO.select(reading, writing, [], timeout)
729
730 now = reactor.mark()
731
732 for s in readable:
733 s.readable()
734 for s in writable:
735 s.writable()
736 for s in self.selectables:
737 if s.deadline and now > s.deadline:
738 s.expired()
739
740 reactor.yield_()
741
745
748
753
757
759 s = event.selectable
760 self._selector.remove(s)
761 s._reactor._selectables -= 1
762 s.release()
763
765 r = event.reactor
766
767 if not r.quiesced:
768 return
769
770 d = r.timer_deadline
771 readable, writable, expired = self._selector.select(r.timeout)
772
773 now = r.mark()
774
775 for s in readable:
776 s.readable()
777 for s in writable:
778 s.writable()
779 for s in expired:
780 s.expired()
781
782 r.yield_()
783
785 s = event.selectable
786 t = s._transport
787
788 # If we're an acceptor we can't have a transport
789 # and we don't want to do anything here in any case
790 if not t:
791 return
792
793 capacity = t.capacity()
794 if capacity > 0:
795 try:
796 b = s.recv(capacity)
797 if len(b) > 0:
798 n = t.push(b)
799 else:
800 # EOF handling
801 self.on_selectable_error(event)
802 except socket.error as e:
803 # TODO: What's the error handling to be here?
804 log.error("Couldn't recv: %r" % e)
805 t.close_tail()
806
807 # Always update as we may have gone to not reading or from
808 # not writing to writing when processing the incoming bytes
809 r = s._reactor
810 self.update(t, s, r.now)
811
813 s = event.selectable
814 t = s._transport
815
816 # If we're an acceptor we can't have a transport
817 # and we don't want to do anything here in any case
818 if not t:
819 return
820
821 pending = t.pending()
822 if pending > 0:
823
824 try:
825 n = s.send(t.peek(pending))
826 t.pop(n)
827 except socket.error as e:
828 log.error("Couldn't send: %r" % e)
829 # TODO: Error? or actually an exception
830 t.close_head()
831
832 newpending = t.pending()
833 if newpending != pending:
834 r = s._reactor
835 self.update(t, s, r.now)
836
838 s = event.selectable
839 t = s._transport
840
841 t.close_head()
842 t.close_tail()
843 s.terminate()
844 s.update()
845
852
854 c = event.connection
855 if not c.state & Endpoint.REMOTE_UNINIT:
856 return
857
858 t = Transport()
859 # It seems perverse, but the C code ignores bind errors too!
860 # and this is required or you get errors because Connector() has already
861 # bound the transport and connection!
862 t.bind_nothrow(c)
863
865 c = event.connection
866 t = event.transport
867
868 reactor = c._reactor
869
870 # link the new transport to its reactor:
871 t._reactor = reactor
872
873 if c._acceptor:
874 # this connection was created by the acceptor. There is already a
875 # socket assigned to this connection. Nothing needs to be done.
876 return
877
878 url = c.url or Url(c.hostname)
879 url.defaults()
880
881 host = url.host
882 port = int(url.port)
883
884 if not c.user:
885 user = url.username
886 if user:
887 c.user = user
888 password = url.password
889 if password:
890 c.password = password
891
892 addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
893
894 # Try first possible address
895 log.debug("Connect trying first transport address: %s", addrs[0])
896 sock = IO.connect(addrs[0])
897
898 # At this point we need to arrange to be called back when the socket is writable
899 connector = ConnectSelectable(sock, reactor, addrs[1:], t, self)
900 connector.collect(reactor._collector)
901 connector.writing = True
902 connector.push_event(connector, Event.SELECTABLE_INIT)
903
904 # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds?
905 t._selectable = None
906
907 @staticmethod
909 try:
910 capacity = transport.capacity()
911 selectable.reading = capacity>0
912 except:
913 if transport.closed:
914 selectable.terminate()
915 try:
916 pending = transport.pending()
917 selectable.writing = pending>0
918 except:
919 if transport.closed:
920 selectable.terminate()
921 selectable.deadline = transport.tick(now)
922 selectable.update()
923
925 t = event.transport
926 r = t._reactor
927 s = t._selectable
928 if s and not s.is_terminal:
929 self.update(t, s, r.now)
930
932 t = event.transport
933 r = t._reactor
934 s = t._selectable
935 if s and not s.is_terminal:
936 s.terminate()
937 r.update(s)
938 t.unbind()
939
943 super(ConnectSelectable, self).__init__(sock, reactor)
944 self._addrs = addrs
945 self._transport = transport
946 self._iohandler = iohandler
947
950
952 e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
953 t = self._transport
954 if e == 0:
955 log.debug("Connection succeeded")
956 s = self._reactor.selectable(delegate=self._delegate)
957 s._transport = t
958 t._selectable = s
959 self._iohandler.update(t, s, t._reactor.now)
960
961 # Disassociate from the socket (which has been passed on)
962 self._delegate = None
963 self.terminate()
964 self.update()
965 return
966 elif e == errno.ECONNREFUSED:
967 if len(self._addrs) > 0:
968 log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
969 sock = IO.connect(self._addrs[0])
970 self._addrs = self._addrs[1:]
971 self._delegate.close()
972 self._delegate = sock
973 return
974 else:
975 log.debug("Connection refused, but tried all transport addresses")
976 t.condition = Condition("proton.pythonio", "Connection refused to all addresses")
977 else:
978 log.error("Couldn't connect: %s", e)
979 t.condition = Condition("proton.pythonio", "Connection error: %s" % e)
980
981 t.close_tail()
982 t.close_head()
983 self.terminate()
984 self.update()
985
| Home | Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Thu Nov 21 10:53:51 2019 | http://epydoc.sourceforge.net |