Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/imageio-2.35.1-py3.8.egg/imageio/core/util.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

266 statements  

1# -*- coding: utf-8 -*- 

2# imageio is distributed under the terms of the (new) BSD License. 

3 

4""" 

5Various utilities for imageio 

6""" 

7 

8 

9from collections import OrderedDict 

10import numpy as np 

11import os 

12import re 

13import struct 

14import sys 

15import time 

16import logging 

17 

18 

19logger = logging.getLogger("imageio") 

20 

21IS_PYPY = "__pypy__" in sys.builtin_module_names 

22THIS_DIR = os.path.abspath(os.path.dirname(__file__)) 

23 

24 

25def urlopen(*args, **kwargs): 

26 """Compatibility function for the urlopen function. Raises an 

27 RuntimeError if urlopen could not be imported (which can occur in 

28 frozen applications. 

29 """ 

30 try: 

31 from urllib.request import urlopen 

32 except ImportError: 

33 raise RuntimeError("Could not import urlopen.") 

34 return urlopen(*args, **kwargs) 

35 

36 

37def _precision_warn(p1, p2, extra=""): 

38 t = ( 

39 "Lossy conversion from {} to {}. {} Convert image to {} prior to " 

40 "saving to suppress this warning." 

41 ) 

42 logger.warning(t.format(p1, p2, extra, p2)) 

43 

44 

45def image_as_uint(im, bitdepth=None): 

46 """Convert the given image to uint (default: uint8) 

47 

48 If the dtype already matches the desired format, it is returned 

49 as-is. If the image is float, and all values are between 0 and 1, 

50 the values are multiplied by np.power(2.0, bitdepth). In all other 

51 situations, the values are scaled such that the minimum value 

52 becomes 0 and the maximum value becomes np.power(2.0, bitdepth)-1 

53 (255 for 8-bit and 65535 for 16-bit). 

54 """ 

55 if not bitdepth: 

56 bitdepth = 8 

57 if not isinstance(im, np.ndarray): 

58 raise ValueError("Image must be a numpy array") 

59 if bitdepth == 8: 

60 out_type = np.uint8 

61 elif bitdepth == 16: 

62 out_type = np.uint16 

63 else: 

64 raise ValueError("Bitdepth must be either 8 or 16") 

65 dtype_str1 = str(im.dtype) 

66 dtype_str2 = out_type.__name__ 

67 if (im.dtype == np.uint8 and bitdepth == 8) or ( 

68 im.dtype == np.uint16 and bitdepth == 16 

69 ): 

70 # Already the correct format? Return as-is 

71 return im 

72 if dtype_str1.startswith("float") and np.nanmin(im) >= 0 and np.nanmax(im) <= 1: 

73 _precision_warn(dtype_str1, dtype_str2, "Range [0, 1].") 

74 im = im.astype(np.float64) * (np.power(2.0, bitdepth) - 1) + 0.499999999 

75 elif im.dtype == np.uint16 and bitdepth == 8: 

76 _precision_warn(dtype_str1, dtype_str2, "Losing 8 bits of resolution.") 

77 im = np.right_shift(im, 8) 

78 elif im.dtype == np.uint32: 

79 _precision_warn( 

80 dtype_str1, 

81 dtype_str2, 

82 "Losing {} bits of resolution.".format(32 - bitdepth), 

83 ) 

84 im = np.right_shift(im, 32 - bitdepth) 

85 elif im.dtype == np.uint64: 

86 _precision_warn( 

87 dtype_str1, 

88 dtype_str2, 

89 "Losing {} bits of resolution.".format(64 - bitdepth), 

90 ) 

91 im = np.right_shift(im, 64 - bitdepth) 

92 else: 

93 mi = np.nanmin(im) 

94 ma = np.nanmax(im) 

95 if not np.isfinite(mi): 

96 raise ValueError("Minimum image value is not finite") 

97 if not np.isfinite(ma): 

98 raise ValueError("Maximum image value is not finite") 

99 if ma == mi: 

100 return im.astype(out_type) 

101 _precision_warn(dtype_str1, dtype_str2, "Range [{}, {}].".format(mi, ma)) 

102 # Now make float copy before we scale 

103 im = im.astype("float64") 

104 # Scale the values between 0 and 1 then multiply by the max value 

105 im = (im - mi) / (ma - mi) * (np.power(2.0, bitdepth) - 1) + 0.499999999 

106 assert np.nanmin(im) >= 0 

107 assert np.nanmax(im) < np.power(2.0, bitdepth) 

108 return im.astype(out_type) 

109 

110 

111class Array(np.ndarray): 

112 """Array(array, meta=None) 

113 

114 A subclass of np.ndarray that has a meta attribute. Get the dictionary 

115 that contains the meta data using ``im.meta``. Convert to a plain numpy 

116 array using ``np.asarray(im)``. 

117 

118 """ 

119 

120 def __new__(cls, array, meta=None): 

121 # Check 

122 if not isinstance(array, np.ndarray): 

123 raise ValueError("Array expects a numpy array.") 

124 if not (meta is None or isinstance(meta, dict)): 

125 raise ValueError("Array expects meta data to be a dict.") 

126 # Convert and return 

127 meta = meta if meta is not None else getattr(array, "meta", {}) 

128 try: 

129 ob = array.view(cls) 

130 except AttributeError: # pragma: no cover 

131 # Just return the original; no metadata on the array in Pypy! 

132 return array 

133 ob._copy_meta(meta) 

134 return ob 

135 

136 def _copy_meta(self, meta): 

137 """Make a 2-level deep copy of the meta dictionary.""" 

138 self._meta = Dict() 

139 for key, val in meta.items(): 

140 if isinstance(val, dict): 

141 val = Dict(val) # Copy this level 

142 self._meta[key] = val 

143 

144 @property 

145 def meta(self): 

146 """The dict with the meta data of this image.""" 

147 return self._meta 

148 

149 def __array_finalize__(self, ob): 

150 """So the meta info is maintained when doing calculations with 

151 the array. 

152 """ 

153 if isinstance(ob, Array): 

154 self._copy_meta(ob.meta) 

155 else: 

156 self._copy_meta({}) 

157 

158 def __array_wrap__(self, out, context=None): 

159 """So that we return a native numpy array (or scalar) when a 

160 reducting ufunc is applied (such as sum(), std(), etc.) 

161 """ 

162 if not out.shape: 

163 return out.dtype.type(out) # Scalar 

164 elif out.shape != self.shape: 

165 return out.view(type=np.ndarray) 

166 elif not isinstance(out, Array): 

167 return Array(out, self.meta) 

168 else: 

169 return out # Type Array 

170 

171 

172Image = Array # Alias for backwards compatibility 

173 

174 

175def asarray(a): 

176 """Pypy-safe version of np.asarray. Pypy's np.asarray consumes a 

177 *lot* of memory if the given array is an ndarray subclass. This 

178 function does not. 

179 """ 

180 if isinstance(a, np.ndarray): 

181 if IS_PYPY: # pragma: no cover 

182 a = a.copy() # pypy has issues with base views 

183 plain = a.view(type=np.ndarray) 

184 return plain 

185 return np.asarray(a) 

186 

187 

188class Dict(OrderedDict): 

189 """A dict in which the keys can be get and set as if they were 

190 attributes. Very convenient in combination with autocompletion. 

191 

192 This Dict still behaves as much as possible as a normal dict, and 

193 keys can be anything that are otherwise valid keys. However, 

194 keys that are not valid identifiers or that are names of the dict 

195 class (such as 'items' and 'copy') cannot be get/set as attributes. 

196 """ 

197 

198 __reserved_names__ = dir(OrderedDict()) # Also from OrderedDict 

199 __pure_names__ = dir(dict()) 

200 

201 def __getattribute__(self, key): 

202 try: 

203 return object.__getattribute__(self, key) 

204 except AttributeError: 

205 if key in self: 

206 return self[key] 

207 else: 

208 raise 

209 

210 def __setattr__(self, key, val): 

211 if key in Dict.__reserved_names__: 

212 # Either let OrderedDict do its work, or disallow 

213 if key not in Dict.__pure_names__: 

214 return OrderedDict.__setattr__(self, key, val) 

215 else: 

216 raise AttributeError( 

217 "Reserved name, this key can only " 

218 + "be set via ``d[%r] = X``" % key 

219 ) 

220 else: 

221 # if isinstance(val, dict): val = Dict(val) -> no, makes a copy! 

222 self[key] = val 

223 

224 def __dir__(self): 

225 def isidentifier(x): 

226 return bool(re.match(r"[a-z_]\w*$", x, re.I)) 

227 

228 names = [k for k in self.keys() if (isinstance(k, str) and isidentifier(k))] 

229 return Dict.__reserved_names__ + names 

230 

231 

232class BaseProgressIndicator(object): 

233 """BaseProgressIndicator(name) 

234 

235 A progress indicator helps display the progress of a task to the 

236 user. Progress can be pending, running, finished or failed. 

237 

238 Each task has: 

239 * a name - a short description of what needs to be done. 

240 * an action - the current action in performing the task (e.g. a subtask) 

241 * progress - how far the task is completed 

242 * max - max number of progress units. If 0, the progress is indefinite 

243 * unit - the units in which the progress is counted 

244 * status - 0: pending, 1: in progress, 2: finished, 3: failed 

245 

246 This class defines an abstract interface. Subclasses should implement 

247 _start, _stop, _update_progress(progressText), _write(message). 

248 """ 

249 

250 def __init__(self, name): 

251 self._name = name 

252 self._action = "" 

253 self._unit = "" 

254 self._max = 0 

255 self._status = 0 

256 self._last_progress_update = 0 

257 

258 def start(self, action="", unit="", max=0): 

259 """start(action='', unit='', max=0) 

260 

261 Start the progress. Optionally specify an action, a unit, 

262 and a maximum progress value. 

263 """ 

264 if self._status == 1: 

265 self.finish() 

266 self._action = action 

267 self._unit = unit 

268 self._max = max 

269 # 

270 self._progress = 0 

271 self._status = 1 

272 self._start() 

273 

274 def status(self): 

275 """status() 

276 

277 Get the status of the progress - 0: pending, 1: in progress, 

278 2: finished, 3: failed 

279 """ 

280 return self._status 

281 

282 def set_progress(self, progress=0, force=False): 

283 """set_progress(progress=0, force=False) 

284 

285 Set the current progress. To avoid unnecessary progress updates 

286 this will only have a visual effect if the time since the last 

287 update is > 0.1 seconds, or if force is True. 

288 """ 

289 self._progress = progress 

290 # Update or not? 

291 if not (force or (time.time() - self._last_progress_update > 0.1)): 

292 return 

293 self._last_progress_update = time.time() 

294 # Compose new string 

295 unit = self._unit or "" 

296 progressText = "" 

297 if unit == "%": 

298 progressText = "%2.1f%%" % progress 

299 elif self._max > 0: 

300 percent = 100 * float(progress) / self._max 

301 progressText = "%i/%i %s (%2.1f%%)" % (progress, self._max, unit, percent) 

302 elif progress > 0: 

303 if isinstance(progress, float): 

304 progressText = "%0.4g %s" % (progress, unit) 

305 else: 

306 progressText = "%i %s" % (progress, unit) 

307 # Update 

308 self._update_progress(progressText) 

309 

310 def increase_progress(self, extra_progress): 

311 """increase_progress(extra_progress) 

312 

313 Increase the progress by a certain amount. 

314 """ 

315 self.set_progress(self._progress + extra_progress) 

316 

317 def finish(self, message=None): 

318 """finish(message=None) 

319 

320 Finish the progress, optionally specifying a message. This will 

321 not set the progress to the maximum. 

322 """ 

323 self.set_progress(self._progress, True) # fore update 

324 self._status = 2 

325 self._stop() 

326 if message is not None: 

327 self._write(message) 

328 

329 def fail(self, message=None): 

330 """fail(message=None) 

331 

332 Stop the progress with a failure, optionally specifying a message. 

333 """ 

334 self.set_progress(self._progress, True) # fore update 

335 self._status = 3 

336 self._stop() 

337 message = "FAIL " + (message or "") 

338 self._write(message) 

339 

340 def write(self, message): 

341 """write(message) 

342 

343 Write a message during progress (such as a warning). 

344 """ 

345 if self.__class__ == BaseProgressIndicator: 

346 # When this class is used as a dummy, print explicit message 

347 print(message) 

348 else: 

349 return self._write(message) 

350 

351 # Implementing classes should implement these 

352 

353 def _start(self): 

354 pass 

355 

356 def _stop(self): 

357 pass 

358 

359 def _update_progress(self, progressText): 

360 pass 

361 

362 def _write(self, message): 

363 pass 

364 

365 

366class StdoutProgressIndicator(BaseProgressIndicator): 

367 """StdoutProgressIndicator(name) 

368 

369 A progress indicator that shows the progress in stdout. It 

370 assumes that the tty can appropriately deal with backspace 

371 characters. 

372 """ 

373 

374 def _start(self): 

375 self._chars_prefix, self._chars = "", "" 

376 # Write message 

377 if self._action: 

378 self._chars_prefix = "%s (%s): " % (self._name, self._action) 

379 else: 

380 self._chars_prefix = "%s: " % self._name 

381 sys.stdout.write(self._chars_prefix) 

382 sys.stdout.flush() 

383 

384 def _update_progress(self, progressText): 

385 # If progress is unknown, at least make something move 

386 if not progressText: 

387 i1, i2, i3, i4 = "-\\|/" 

388 M = {i1: i2, i2: i3, i3: i4, i4: i1} 

389 progressText = M.get(self._chars, i1) 

390 # Store new string and write 

391 delChars = "\b" * len(self._chars) 

392 self._chars = progressText 

393 sys.stdout.write(delChars + self._chars) 

394 sys.stdout.flush() 

395 

396 def _stop(self): 

397 self._chars = self._chars_prefix = "" 

398 sys.stdout.write("\n") 

399 sys.stdout.flush() 

400 

401 def _write(self, message): 

402 # Write message 

403 delChars = "\b" * len(self._chars_prefix + self._chars) 

404 sys.stdout.write(delChars + " " + message + "\n") 

405 # Reprint progress text 

406 sys.stdout.write(self._chars_prefix + self._chars) 

407 sys.stdout.flush() 

408 

409 

410# From pyzolib/paths.py (https://bitbucket.org/pyzo/pyzolib/src/tip/paths.py) 

411def appdata_dir(appname=None, roaming=False): 

412 """appdata_dir(appname=None, roaming=False) 

413 

414 Get the path to the application directory, where applications are allowed 

415 to write user specific files (e.g. configurations). For non-user specific 

416 data, consider using common_appdata_dir(). 

417 If appname is given, a subdir is appended (and created if necessary). 

418 If roaming is True, will prefer a roaming directory (Windows Vista/7). 

419 """ 

420 

421 # Define default user directory 

422 userDir = os.getenv("IMAGEIO_USERDIR", None) 

423 if userDir is None: 

424 userDir = os.path.expanduser("~") 

425 if not os.path.isdir(userDir): # pragma: no cover 

426 userDir = "/var/tmp" # issue #54 

427 

428 # Get system app data dir 

429 path = None 

430 if sys.platform.startswith("win"): 

431 path1, path2 = os.getenv("LOCALAPPDATA"), os.getenv("APPDATA") 

432 path = (path2 or path1) if roaming else (path1 or path2) 

433 elif sys.platform.startswith("darwin"): 

434 path = os.path.join(userDir, "Library", "Application Support") 

435 # On Linux and as fallback 

436 if not (path and os.path.isdir(path)): 

437 path = userDir 

438 

439 # Maybe we should store things local to the executable (in case of a 

440 # portable distro or a frozen application that wants to be portable) 

441 prefix = sys.prefix 

442 if getattr(sys, "frozen", None): 

443 prefix = os.path.abspath(os.path.dirname(sys.executable)) 

444 for reldir in ("settings", "../settings"): 

445 localpath = os.path.abspath(os.path.join(prefix, reldir)) 

446 if os.path.isdir(localpath): # pragma: no cover 

447 try: 

448 open(os.path.join(localpath, "test.write"), "wb").close() 

449 os.remove(os.path.join(localpath, "test.write")) 

450 except IOError: 

451 pass # We cannot write in this directory 

452 else: 

453 path = localpath 

454 break 

455 

456 # Get path specific for this app 

457 if appname: 

458 if path == userDir: 

459 appname = "." + appname.lstrip(".") # Make it a hidden directory 

460 path = os.path.join(path, appname) 

461 if not os.path.isdir(path): # pragma: no cover 

462 os.makedirs(path, exist_ok=True) 

463 

464 # Done 

465 return path 

466 

467 

468def resource_dirs(): 

469 """resource_dirs() 

470 

471 Get a list of directories where imageio resources may be located. 

472 The first directory in this list is the "resources" directory in 

473 the package itself. The second directory is the appdata directory 

474 (~/.imageio on Linux). The list further contains the application 

475 directory (for frozen apps), and may include additional directories 

476 in the future. 

477 """ 

478 dirs = [resource_package_dir()] 

479 # Resource dir baked in the package. 

480 # Appdata directory 

481 try: 

482 dirs.append(appdata_dir("imageio")) 

483 except Exception: # pragma: no cover 

484 pass # The home dir may not be writable 

485 # Directory where the app is located (mainly for frozen apps) 

486 if getattr(sys, "frozen", None): 

487 dirs.append(os.path.abspath(os.path.dirname(sys.executable))) 

488 elif sys.path and sys.path[0]: 

489 dirs.append(os.path.abspath(sys.path[0])) 

490 return dirs 

491 

492 

493def resource_package_dir(): 

494 """package_dir 

495 

496 Get the resources directory in the imageio package installation 

497 directory. 

498 

499 Notes 

500 ----- 

501 This is a convenience method that is used by `resource_dirs` and 

502 imageio entry point scripts. 

503 """ 

504 # Make pkg_resources optional if setuptools is not available 

505 try: 

506 # Avoid importing pkg_resources in the top level due to how slow it is 

507 # https://github.com/pypa/setuptools/issues/510 

508 import pkg_resources 

509 except ImportError: 

510 pkg_resources = None 

511 

512 if pkg_resources: 

513 # The directory returned by `pkg_resources.resource_filename` 

514 # also works with eggs. 

515 pdir = pkg_resources.resource_filename("imageio", "resources") 

516 else: 

517 # If setuptools is not available, use fallback 

518 pdir = os.path.abspath(os.path.join(THIS_DIR, "..", "resources")) 

519 return pdir 

520 

521 

522def get_platform(): 

523 """get_platform() 

524 

525 Get a string that specifies the platform more specific than 

526 sys.platform does. The result can be: linux32, linux64, win32, 

527 win64, osx32, osx64. Other platforms may be added in the future. 

528 """ 

529 # Get platform 

530 if sys.platform.startswith("linux"): 

531 plat = "linux%i" 

532 elif sys.platform.startswith("win"): 

533 plat = "win%i" 

534 elif sys.platform.startswith("darwin"): 

535 plat = "osx%i" 

536 elif sys.platform.startswith("freebsd"): 

537 plat = "freebsd%i" 

538 else: # pragma: no cover 

539 return None 

540 

541 return plat % (struct.calcsize("P") * 8) # 32 or 64 bits 

542 

543 

544def has_module(module_name): 

545 """Check to see if a python module is available.""" 

546 if sys.version_info > (3, 4): 

547 import importlib 

548 

549 name_parts = module_name.split(".") 

550 for i in range(len(name_parts)): 

551 if importlib.util.find_spec(".".join(name_parts[: i + 1])) is None: 

552 return False 

553 return True 

554 else: # pragma: no cover 

555 import imp 

556 

557 try: 

558 imp.find_module(module_name) 

559 except ImportError: 

560 return False 

561 return True