Coverage for src / ezqt_widgets / utils / _network_utils.py: 53.95%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 10:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 10:03 +0000
1# ///////////////////////////////////////////////////////////////
2# NETWORK_UTILS - Qt Network Helpers
3# Project: ezqt_widgets
4# ///////////////////////////////////////////////////////////////
6"""
7Qt network helper utilities.
9Provides small utilities for fetching icon bytes using QtNetwork, which
10respects system proxy settings by default.
11"""
13from __future__ import annotations
15# ///////////////////////////////////////////////////////////////
16# IMPORTS
17# ///////////////////////////////////////////////////////////////
18# Third-party imports
19from PySide6.QtCore import QEventLoop, QObject, QTimer, QUrl, Signal
20from PySide6.QtNetwork import (
21 QNetworkAccessManager,
22 QNetworkProxyFactory,
23 QNetworkReply,
24 QNetworkRequest,
25)
27# FUNCTIONS
28# ///////////////////////////////////////////////////////////////
31_network_manager_cache: dict[str, QNetworkAccessManager] = {}
34def _get_network_manager() -> QNetworkAccessManager:
35 if "instance" not in _network_manager_cache:
36 QNetworkProxyFactory.setUseSystemConfiguration(True)
37 _network_manager_cache["instance"] = QNetworkAccessManager()
38 return _network_manager_cache["instance"]
41class UrlFetcher(QObject):
42 """Fetch URL data using QtNetwork and emit a signal on completion."""
44 fetched = Signal(str, object)
46 def __init__(self, parent: QObject | None = None) -> None:
47 super().__init__(parent)
48 self._pending: dict[QNetworkReply, QTimer] = {}
50 def fetch(self, url: str, timeout_ms: int = 5000) -> None:
51 """Fetch bytes asynchronously and emit a signal on completion.
53 Args:
54 url: The URL to fetch.
55 timeout_ms: Timeout in milliseconds (default: 5000).
56 """
57 if not url: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 self.fetched.emit(url, None)
59 return
61 manager = _get_network_manager()
62 request = QNetworkRequest(QUrl(url))
63 reply = manager.get(request)
65 timer = QTimer(self)
66 timer.setSingleShot(True)
68 def _cleanup(data: bytes | None) -> None:
69 if reply in self._pending: 69 ↛ 71line 69 didn't jump to line 71 because the condition on line 69 was always true
70 self._pending.pop(reply, None)
71 timer.stop()
72 timer.deleteLater()
73 reply.deleteLater()
74 self.fetched.emit(url, data)
76 def _on_timeout() -> None:
77 reply.abort()
78 _cleanup(None)
80 def _on_finished() -> None:
81 if reply.error() != QNetworkReply.NetworkError.NoError: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 _cleanup(None)
83 return
84 data = bytes(reply.readAll().data())
85 _cleanup(data)
87 timer.timeout.connect(_on_timeout)
88 reply.finished.connect(_on_finished)
89 timer.start(timeout_ms)
90 self._pending[reply] = timer
93def fetch_url_bytes(url: str, timeout_ms: int = 5000) -> bytes | None:
94 """Fetch bytes from a URL using QtNetwork.
96 Args:
97 url: The URL to fetch.
98 timeout_ms: Timeout in milliseconds (default: 5000).
100 Returns:
101 The response bytes, or None on error/timeout.
102 """
103 if not url:
104 return None
106 manager = _get_network_manager()
107 request = QNetworkRequest(QUrl(url))
108 reply = manager.get(request)
110 loop = QEventLoop()
111 timer = QTimer()
112 timer.setSingleShot(True)
113 timer.timeout.connect(loop.quit)
114 reply.finished.connect(loop.quit)
116 timer.start(timeout_ms)
117 loop.exec()
119 if timer.isActive() and reply.error() == QNetworkReply.NetworkError.NoError:
120 data = bytes(reply.readAll().data())
121 reply.deleteLater()
122 return data
124 reply.abort()
125 reply.deleteLater()
126 return None
129# ///////////////////////////////////////////////////////////////
130# PUBLIC API
131# ///////////////////////////////////////////////////////////////
133__all__ = ["UrlFetcher", "fetch_url_bytes"]