# coding=utf-8 """ @File : lib_opus.py @Time : 2022/8/16 下午7:24 @Author : LLH @Description : opus压缩 业务调用流程: OpusEncoder* opus_encoder_create(opus_int32, int, int, int*) opus_int32 opus_encode(OpusEncoder*, const opus_int16*, int, unsigned char*, opus_int32) void opus_encoder_destroy(OpusEncoder*) """ import array import ctypes as c import os import typing from util import my_ctypes _basedir = os.path.dirname(os.path.abspath(__file__)) dll_fps = { "win32": os.path.join(_basedir, "../opus", f'libopus.dll'), "linux": os.path.join(_basedir, "../opus", f'libopus.so'), } class Encoder(c.Structure): # pylint: disable=too-few-public-methods """Opus encoder state. This contains the complete state of an Opus encoder. """ pass opus_exported_functions = [ # (func,argtypes,restype,errcheck) ('opus_encoder_get_size', [c.c_int], c.c_int, None), ('opus_strerror', [c.c_int], c.c_char_p, None), ('opus_encoder_create', [c.c_int, c.c_int, c.c_int, c.POINTER(c.c_int)], c.POINTER(Encoder), None), ('opus_encoder_ctl', [], c.c_int, None), ('opus_encode', [c.POINTER(Encoder), c.POINTER(c.c_int16), c.c_int, c.c_char_p, c.c_int32], c.c_int32, None), ('opus_encoder_destroy', [c.POINTER(Encoder)], None, None), ] # No Error OK = 0 SET_SIGNAL_REQUEST = 4024 OPUS_APPLICATION_VOIP = 2048 OPUS_SIGNAL_VOICE = 3001 def ctl_set(request): """Set new CTL value to a encoder/decoder""" def inner(func, obj, value): result_code = func(obj, request, value) if result_code is not OK: raise str(result_code) return inner # Configures the type of signal being encoded set_signal = ctl_set(SET_SIGNAL_REQUEST) class Opus: def __init__(self): self.dll = my_ctypes.lib_load_default(dll_fps, opus_exported_functions) def get_size(self, channels: int) -> typing.Union[int, typing.Any]: """Gets the size of an OpusEncoder structure.""" if channels not in (1, 2): raise ValueError('Wrong channels value. Must be equal to 1 or 2') return self.dll.opus_encoder_get_size(channels) def create_state(self, fs: int, channels: int, application: int) -> c.Structure: """Allocates and initializes an encoder state.""" result_code = c.c_int() result = self.dll.opus_encoder_create( fs, channels, application, c.byref(result_code) ) if result_code.value != 0: raise str(self.dll.opus_strerror(result_code.value)) return result def encoder_ctl(self, encoder_state: c.Structure, request, value=None) -> typing.Union[int, typing.Any]: if value is not None: return request(self.dll.opus_encoder_ctl, encoder_state, value) return request(self.dll.opus_encoder_ctl, encoder_state) def encode(self, encoder_state: c.Structure, pcm_data: bytes, frame_size: int, max_data_bytes: int ) -> typing.Union[bytes, typing.Any]: """ Encodes an Opus Frame. Returns string output payload. Parameters: [in] st OpusEncoder*: Encoder state [in] pcm opus_int16*: Input signal (interleaved if 2 channels). length is frame_size*channels*sizeof(opus_int16) [in] frame_size int: Number of samples per channel in the input signal. This must be an Opus frame size for the encoder's sampling rate. For example, at 48 kHz the permitted values are 120, 240, 480, 960, 1920, and 2880. Passing in a duration of less than 10 ms (480 samples at 48 kHz) will prevent the encoder from using the LPC or hybrid modes. [out] data unsigned char*: Output payload. This must contain storage for at least max_data_bytes. [in] max_data_bytes opus_int32: Size of the allocated memory for the output payload. This may be used to impose an upper limit on the instant bitrate, but should not be used as the only bitrate control. Use OPUS_SET_BITRATE to control the bitrate. """ pcm_pointer = c.cast(pcm_data, c.POINTER(c.c_int16)) opus_data = (c.c_char * max_data_bytes)() result = self.dll.opus_encode( encoder_state, pcm_pointer, frame_size, opus_data, max_data_bytes ) if result < 0: raise str(self.dll.opus_strerror(result)) return array.array('b', opus_data[:result]).tobytes() def destroy(self, encoder_state: c.Structure): self.dll.opus_encoder_destroy(encoder_state)