Linux server.edchosting.com 4.18.0-553.79.1.lve.el7h.x86_64 #1 SMP Wed Oct 15 16:34:46 UTC 2025 x86_64
LiteSpeed
Server IP : 75.98.162.185 & Your IP : 216.73.216.163
Domains :
Cant Read [ /etc/named.conf ]
User : goons4good
Terminal
Auto Root
Create File
Create Folder
Localroot Suggester
Backdoor Destroyer
Readme
/
lib /
python2.7 /
site-packages /
supervisor /
tests /
Delete
Unzip
Name
Size
Permission
Date
Action
fixtures
[ DIR ]
drwxr-xr-x
2024-07-29 10:18
__init__.py
20
B
-rw-r--r--
2020-03-10 20:45
__init__.pyc
148
B
-rw-r--r--
2020-03-10 20:45
__init__.pyo
148
B
-rw-r--r--
2020-03-10 20:45
base.py
33.88
KB
-rw-r--r--
2020-03-10 20:45
base.pyc
52.56
KB
-rw-r--r--
2020-03-10 20:45
base.pyo
52.56
KB
-rw-r--r--
2020-03-10 20:45
test_childutils.py
5.27
KB
-rw-r--r--
2020-03-10 20:45
test_childutils.pyc
7.63
KB
-rw-r--r--
2020-03-10 20:45
test_childutils.pyo
7.63
KB
-rw-r--r--
2020-03-10 20:45
test_confecho.py
540
B
-rw-r--r--
2020-03-10 20:45
test_confecho.pyc
1.25
KB
-rw-r--r--
2020-03-10 20:45
test_confecho.pyo
1.25
KB
-rw-r--r--
2020-03-10 20:45
test_datatypes.py
26.65
KB
-rw-r--r--
2020-03-10 20:45
test_datatypes.pyc
47.5
KB
-rw-r--r--
2020-03-10 20:45
test_datatypes.pyo
47.5
KB
-rw-r--r--
2020-03-10 20:45
test_dispatchers.py
47.68
KB
-rw-r--r--
2020-03-10 20:45
test_dispatchers.pyc
47.49
KB
-rw-r--r--
2020-03-10 20:45
test_dispatchers.pyo
47.49
KB
-rw-r--r--
2020-03-10 20:45
test_events.py
20.35
KB
-rw-r--r--
2020-03-10 20:45
test_events.pyc
24.96
KB
-rw-r--r--
2020-03-10 20:45
test_events.pyo
24.96
KB
-rw-r--r--
2020-03-10 20:45
test_http.py
24.48
KB
-rw-r--r--
2020-03-10 20:45
test_http.pyc
34.38
KB
-rw-r--r--
2020-03-10 20:45
test_http.pyo
34.38
KB
-rw-r--r--
2020-03-10 20:45
test_loggers.py
12.65
KB
-rw-r--r--
2020-03-10 20:45
test_loggers.pyc
18.18
KB
-rw-r--r--
2020-03-10 20:45
test_loggers.pyo
18.18
KB
-rw-r--r--
2020-03-10 20:45
test_options.py
132.88
KB
-rw-r--r--
2020-03-10 20:45
test_options.pyc
136.01
KB
-rw-r--r--
2020-03-10 20:45
test_options.pyo
136.01
KB
-rw-r--r--
2020-03-10 20:45
test_poller.py
16.29
KB
-rw-r--r--
2020-03-10 20:45
test_poller.pyc
20.74
KB
-rw-r--r--
2020-03-10 20:45
test_poller.pyo
20.5
KB
-rw-r--r--
2020-03-10 20:45
test_process.py
91.59
KB
-rw-r--r--
2020-03-10 20:45
test_process.pyc
93.63
KB
-rw-r--r--
2020-03-10 20:45
test_process.pyo
93.63
KB
-rw-r--r--
2020-03-10 20:45
test_rpcinterfaces.py
94.86
KB
-rw-r--r--
2020-03-10 20:45
test_rpcinterfaces.pyc
80.93
KB
-rw-r--r--
2020-03-10 20:45
test_rpcinterfaces.pyo
80.93
KB
-rw-r--r--
2020-03-10 20:45
test_socket_manager.py
7.84
KB
-rw-r--r--
2020-03-10 20:45
test_socket_manager.pyc
11.29
KB
-rw-r--r--
2020-03-10 20:45
test_socket_manager.pyo
11.29
KB
-rw-r--r--
2020-03-10 20:45
test_states.py
2.22
KB
-rw-r--r--
2020-03-10 20:45
test_states.pyc
4.4
KB
-rw-r--r--
2020-03-10 20:45
test_states.pyo
4.4
KB
-rw-r--r--
2020-03-10 20:45
test_supervisorctl.py
66.26
KB
-rw-r--r--
2020-03-10 20:45
test_supervisorctl.pyc
87.29
KB
-rw-r--r--
2020-03-10 20:45
test_supervisorctl.pyo
87.29
KB
-rw-r--r--
2020-03-10 20:45
test_supervisord.py
29.39
KB
-rw-r--r--
2020-03-10 20:45
test_supervisord.pyc
27.4
KB
-rw-r--r--
2020-03-10 20:45
test_supervisord.pyo
27.4
KB
-rw-r--r--
2020-03-10 20:45
test_web.py
6.67
KB
-rw-r--r--
2020-03-10 20:45
test_web.pyc
9.52
KB
-rw-r--r--
2020-03-10 20:45
test_web.pyo
9.52
KB
-rw-r--r--
2020-03-10 20:45
test_xmlrpc.py
34.05
KB
-rw-r--r--
2020-03-10 20:45
test_xmlrpc.pyc
47.64
KB
-rw-r--r--
2020-03-10 20:45
test_xmlrpc.pyo
47.64
KB
-rw-r--r--
2020-03-10 20:45
Save
Rename
"""Test suite for supervisor.socket_manager""" import gc import sys import os import unittest import socket import tempfile try: import __pypy__ except ImportError: __pypy__ = None from supervisor.tests.base import DummySocketConfig from supervisor.tests.base import DummyLogger from supervisor.datatypes import UnixStreamSocketConfig from supervisor.datatypes import InetStreamSocketConfig class TestObject: def __init__(self): self.value = 5 def getValue(self): return self.value def setValue(self, val): self.value = val class ProxyTest(unittest.TestCase): def setUp(self): self.on_deleteCalled = False def _getTargetClass(self): from supervisor.socket_manager import Proxy return Proxy def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def setOnDeleteCalled(self): self.on_deleteCalled = True def test_proxy_getattr(self): proxy = self._makeOne(TestObject()) self.assertEqual(5, proxy.getValue()) def test_on_delete(self): proxy = self._makeOne(TestObject(), on_delete=self.setOnDeleteCalled) self.assertEqual(5, proxy.getValue()) proxy = None gc_collect() self.assertTrue(self.on_deleteCalled) class ReferenceCounterTest(unittest.TestCase): def setUp(self): self.running = False def start(self): self.running = True def stop(self): self.running = False def _getTargetClass(self): from supervisor.socket_manager import ReferenceCounter return ReferenceCounter def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_incr_and_decr(self): ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start) self.assertFalse(self.running) ctr.increment() self.assertTrue(self.running) self.assertEqual(1, ctr.get_count()) ctr.increment() self.assertTrue(self.running) self.assertEqual(2, ctr.get_count()) ctr.decrement() self.assertTrue(self.running) self.assertEqual(1, ctr.get_count()) ctr.decrement() self.assertFalse(self.running) self.assertEqual(0, ctr.get_count()) def test_decr_at_zero_raises_error(self): ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start) self.assertRaises(Exception, ctr.decrement) class SocketManagerTest(unittest.TestCase): def tearDown(self): gc_collect() def _getTargetClass(self): from supervisor.socket_manager import SocketManager return SocketManager def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_repr(self): conf = DummySocketConfig(2) sock_manager = self._makeOne(conf) expected = "<%s at %s for %s>" % ( sock_manager.__class__, id(sock_manager), conf.url) self.assertEqual(repr(sock_manager), expected) def test_get_config(self): conf = DummySocketConfig(2) sock_manager = self._makeOne(conf) self.assertEqual(conf, sock_manager.config()) def test_tcp_w_hostname(self): conf = InetStreamSocketConfig('localhost', 51041) sock_manager = self._makeOne(conf) self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 51041)) def test_tcp_w_ip(self): conf = InetStreamSocketConfig('127.0.0.1', 51041) sock_manager = self._makeOne(conf) self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 51041)) def test_unix(self): (tf_fd, tf_name) = tempfile.mkstemp(); conf = UnixStreamSocketConfig(tf_name) sock_manager = self._makeOne(conf) self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), tf_name) sock = None os.close(tf_fd) def test_socket_lifecycle(self): conf = DummySocketConfig(2) sock_manager = self._makeOne(conf) # Assert that sockets are created on demand self.assertFalse(sock_manager.is_prepared()) # Get two socket references sock = sock_manager.get_socket() self.assertTrue(sock_manager.is_prepared()) #socket created on demand sock_id = id(sock._get()) sock2 = sock_manager.get_socket() sock2_id = id(sock2._get()) # Assert that they are not the same proxy object self.assertNotEqual(sock, sock2) # Assert that they are the same underlying socket self.assertEqual(sock_id, sock2_id) # Socket not actually closed yet b/c ref ct is 2 self.assertEqual(2, sock_manager.get_socket_ref_count()) self.assertTrue(sock_manager.is_prepared()) self.assertFalse(sock_manager.socket.close_called) sock = None gc_collect() # Socket not actually closed yet b/c ref ct is 1 self.assertTrue(sock_manager.is_prepared()) self.assertFalse(sock_manager.socket.close_called) sock2 = None gc_collect() # Socket closed self.assertFalse(sock_manager.is_prepared()) self.assertTrue(sock_manager.socket.close_called) # Get a new socket reference sock3 = sock_manager.get_socket() self.assertTrue(sock_manager.is_prepared()) sock3_id = id(sock3._get()) # Assert that it is not the same socket self.assertNotEqual(sock_id, sock3_id) # Drop ref ct to zero del sock3 gc_collect() # Now assert that socket is closed self.assertFalse(sock_manager.is_prepared()) self.assertTrue(sock_manager.socket.close_called) def test_logging(self): conf = DummySocketConfig(1) logger = DummyLogger() sock_manager = self._makeOne(conf, logger=logger) # socket open sock = sock_manager.get_socket() self.assertEqual(len(logger.data), 1) self.assertEqual('Creating socket %s' % repr(conf), logger.data[0]) # socket close del sock gc_collect() self.assertEqual(len(logger.data), 2) self.assertEqual('Closing socket %s' % repr(conf), logger.data[1]) def test_prepare_socket(self): conf = DummySocketConfig(1) sock_manager = self._makeOne(conf) sock = sock_manager.get_socket() self.assertTrue(sock_manager.is_prepared()) self.assertFalse(sock.bind_called) self.assertTrue(sock.listen_called) self.assertEqual(sock.listen_backlog, socket.SOMAXCONN) self.assertFalse(sock.close_called) def test_tcp_socket_already_taken(self): conf = InetStreamSocketConfig('127.0.0.1', 51041) sock_manager = self._makeOne(conf) sock = sock_manager.get_socket() sock_manager2 = self._makeOne(conf) self.assertRaises(socket.error, sock_manager2.get_socket) del sock def test_unix_bad_sock(self): conf = UnixStreamSocketConfig('/notthere/foo.sock') sock_manager = self._makeOne(conf) self.assertRaises(socket.error, sock_manager.get_socket) def test_close_requires_prepared_socket(self): conf = InetStreamSocketConfig('127.0.0.1', 51041) sock_manager = self._makeOne(conf) self.assertFalse(sock_manager.is_prepared()) try: sock_manager._close() self.fail() except Exception, e: self.assertEqual(e.args[0], 'Socket has not been prepared') def gc_collect(): if __pypy__ is not None: gc.collect() gc.collect() gc.collect() def test_suite(): return unittest.findTestCases(sys.modules[__name__]) if __name__ == '__main__': unittest.main(defaultTest='test_suite')