lib_opus.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # coding=utf-8
  2. """
  3. @File : lib_opus.py
  4. @Time : 2022/8/16 下午7:24
  5. @Author : LLH
  6. @Description : opus压缩
  7. 业务调用流程:
  8. OpusEncoder* opus_encoder_create(opus_int32, int, int, int*)
  9. opus_int32 opus_encode(OpusEncoder*, const opus_int16*, int, unsigned char*, opus_int32)
  10. void opus_encoder_destroy(OpusEncoder*)
  11. """
  12. import array
  13. import ctypes as c
  14. import os
  15. import typing
  16. from util import my_ctypes
  17. _basedir = os.path.dirname(os.path.abspath(__file__))
  18. dll_fps = {
  19. "win32": os.path.join(_basedir, "../opus", f'libopus.dll'),
  20. "linux": os.path.join(_basedir, "../opus", f'libopus.so'),
  21. }
  22. class Encoder(c.Structure): # pylint: disable=too-few-public-methods
  23. """Opus encoder state.
  24. This contains the complete state of an Opus encoder.
  25. """
  26. pass
  27. opus_exported_functions = [
  28. # (func,argtypes,restype,errcheck)
  29. ('opus_encoder_get_size', [c.c_int], c.c_int, None),
  30. ('opus_strerror', [c.c_int], c.c_char_p, None),
  31. ('opus_encoder_create', [c.c_int, c.c_int, c.c_int, c.POINTER(c.c_int)], c.POINTER(Encoder), None),
  32. ('opus_encoder_ctl', [], c.c_int, None),
  33. ('opus_encode', [c.POINTER(Encoder), c.POINTER(c.c_int16), c.c_int, c.c_char_p, c.c_int32], c.c_int32, None),
  34. ('opus_encoder_destroy', [c.POINTER(Encoder)], None, None),
  35. ]
  36. # No Error
  37. OK = 0
  38. SET_SIGNAL_REQUEST = 4024
  39. OPUS_APPLICATION_VOIP = 2048
  40. OPUS_SIGNAL_VOICE = 3001
  41. def ctl_set(request):
  42. """Set new CTL value to a encoder/decoder"""
  43. def inner(func, obj, value):
  44. result_code = func(obj, request, value)
  45. if result_code is not OK:
  46. raise str(result_code)
  47. return inner
  48. # Configures the type of signal being encoded
  49. set_signal = ctl_set(SET_SIGNAL_REQUEST)
  50. class Opus:
  51. def __init__(self):
  52. self.dll = my_ctypes.lib_load_default(dll_fps, opus_exported_functions)
  53. def get_size(self, channels: int) -> typing.Union[int, typing.Any]:
  54. """Gets the size of an OpusEncoder structure."""
  55. if channels not in (1, 2):
  56. raise ValueError('Wrong channels value. Must be equal to 1 or 2')
  57. return self.dll.opus_encoder_get_size(channels)
  58. def create_state(self, fs: int, channels: int, application: int) -> c.Structure:
  59. """Allocates and initializes an encoder state."""
  60. result_code = c.c_int()
  61. result = self.dll.opus_encoder_create(
  62. fs,
  63. channels,
  64. application,
  65. c.byref(result_code)
  66. )
  67. if result_code.value != 0:
  68. raise str(self.dll.opus_strerror(result_code.value))
  69. return result
  70. def encoder_ctl(self, encoder_state: c.Structure, request, value=None) -> typing.Union[int, typing.Any]:
  71. if value is not None:
  72. return request(self.dll.opus_encoder_ctl, encoder_state, value)
  73. return request(self.dll.opus_encoder_ctl, encoder_state)
  74. def encode(self,
  75. encoder_state: c.Structure,
  76. pcm_data: bytes,
  77. frame_size: int,
  78. max_data_bytes: int
  79. ) -> typing.Union[bytes, typing.Any]:
  80. """
  81. Encodes an Opus Frame.
  82. Returns string output payload.
  83. Parameters:
  84. [in] st OpusEncoder*: Encoder state
  85. [in] pcm opus_int16*: Input signal (interleaved if 2 channels). length
  86. is frame_size*channels*sizeof(opus_int16)
  87. [in] frame_size int: Number of samples per channel in the input signal.
  88. This must be an Opus frame size for the encoder's sampling rate. For
  89. example, at 48 kHz the permitted values are 120, 240, 480, 960,
  90. 1920, and 2880. Passing in a duration of less than 10 ms
  91. (480 samples at 48 kHz) will prevent the encoder from using the
  92. LPC or hybrid modes.
  93. [out] data unsigned char*: Output payload. This must contain storage
  94. for at least max_data_bytes.
  95. [in] max_data_bytes opus_int32: Size of the allocated memory for the
  96. output payload. This may be used to impose an upper limit on the
  97. instant bitrate, but should not be used as the only bitrate control.
  98. Use OPUS_SET_BITRATE to control the bitrate.
  99. """
  100. pcm_pointer = c.cast(pcm_data, c.POINTER(c.c_int16))
  101. opus_data = (c.c_char * max_data_bytes)()
  102. result = self.dll.opus_encode(
  103. encoder_state,
  104. pcm_pointer,
  105. frame_size,
  106. opus_data,
  107. max_data_bytes
  108. )
  109. if result < 0:
  110. raise str(self.dll.opus_strerror(result))
  111. return array.array('b', opus_data[:result]).tobytes()
  112. def destroy(self, encoder_state: c.Structure):
  113. self.dll.opus_encoder_destroy(encoder_state)