Linux server.edchosting.com 4.18.0-553.79.1.lve.el7h.x86_64 #1 SMP Wed Oct 15 16:34:46 UTC 2025 x86_64
LiteSpeed
Server IP : 75.98.162.185 & Your IP : 216.73.216.163
Domains :
Cant Read [ /etc/named.conf ]
User : goons4good
Terminal
Auto Root
Create File
Create Folder
Localroot Suggester
Backdoor Destroyer
Readme
/
lib64 /
python3.6 /
site-packages /
zmq /
backend /
cffi /
Delete
Unzip
Name
Size
Permission
Date
Action
__pycache__
[ DIR ]
drwxr-xr-x
2022-03-29 05:07
__init__.py
616
B
-rw-r--r--
2018-02-10 08:02
_cdefs.h
1.94
KB
-rw-r--r--
2018-02-10 08:02
_cffi.py
3.79
KB
-rw-r--r--
2018-02-10 08:02
_poll.py
2.77
KB
-rw-r--r--
2018-02-10 08:02
_verify.c
208
B
-rw-r--r--
2018-02-10 08:02
constants.py
357
B
-rw-r--r--
2018-02-10 08:02
context.py
2.52
KB
-rw-r--r--
2018-02-10 08:02
devices.py
580
B
-rw-r--r--
2018-02-10 08:02
error.py
353
B
-rw-r--r--
2018-02-10 08:02
message.py
1.36
KB
-rw-r--r--
2018-02-10 08:02
socket.py
8.49
KB
-rw-r--r--
2018-02-10 08:02
utils.py
1.45
KB
-rw-r--r--
2018-02-10 08:02
Save
Rename
# coding: utf-8 """zmq poll function""" # Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. try: from time import monotonic except ImportError: from time import clock as monotonic import warnings from ._cffi import C, ffi from zmq.error import InterruptedSystemCall, _check_rc def _make_zmq_pollitem(socket, flags): zmq_socket = socket._zmq_socket zmq_pollitem = ffi.new('zmq_pollitem_t*') zmq_pollitem.socket = zmq_socket zmq_pollitem.fd = 0 zmq_pollitem.events = flags zmq_pollitem.revents = 0 return zmq_pollitem[0] def _make_zmq_pollitem_fromfd(socket_fd, flags): zmq_pollitem = ffi.new('zmq_pollitem_t*') zmq_pollitem.socket = ffi.NULL zmq_pollitem.fd = socket_fd zmq_pollitem.events = flags zmq_pollitem.revents = 0 return zmq_pollitem[0] def zmq_poll(sockets, timeout): cffi_pollitem_list = [] low_level_to_socket_obj = {} from zmq import Socket for item in sockets: if isinstance(item[0], Socket): low_level_to_socket_obj[item[0]._zmq_socket] = item cffi_pollitem_list.append(_make_zmq_pollitem(item[0], item[1])) else: if not isinstance(item[0], int): # not an FD, get it from fileno() item = (item[0].fileno(), item[1]) low_level_to_socket_obj[item[0]] = item cffi_pollitem_list.append(_make_zmq_pollitem_fromfd(item[0], item[1])) items = ffi.new('zmq_pollitem_t[]', cffi_pollitem_list) list_length = ffi.cast('int', len(cffi_pollitem_list)) while True: c_timeout = ffi.cast('long', timeout) start = monotonic() rc = C.zmq_poll(items, list_length, c_timeout) try: _check_rc(rc) except InterruptedSystemCall: if timeout > 0: ms_passed = int(1000 * (monotonic() - start)) if ms_passed < 0: # don't allow negative ms_passed, # which can happen on old Python versions without time.monotonic. warnings.warn( "Negative elapsed time for interrupted poll: %s." " Did the clock change?" % ms_passed, RuntimeWarning) ms_passed = 0 timeout = max(0, timeout - ms_passed) continue else: break result = [] for index in range(len(items)): if items[index].revents > 0: if not items[index].socket == ffi.NULL: result.append((low_level_to_socket_obj[items[index].socket][0], items[index].revents)) else: result.append((items[index].fd, items[index].revents)) return result __all__ = ['zmq_poll']