123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- # coding=utf-8
- """
- @File : lib_opus.py
- @Time : 2022/8/16 下午7:24
- @Author : LLH
- @Description : opus压缩
- 业务调用流程:
- OpusEncoder* opus_encoder_create(opus_int32, int, int, int*)
- opus_int32 opus_encode(OpusEncoder*, const opus_int16*, int, unsigned char*, opus_int32)
- void opus_encoder_destroy(OpusEncoder*)
- """
- import array
- import ctypes as c
- import os
- import typing
- from util import my_ctypes
- _basedir = os.path.dirname(os.path.abspath(__file__))
- dll_fps = {
- "win32": os.path.join(_basedir, "../opus", f'libopus.dll'),
- "linux": os.path.join(_basedir, "../opus", f'libopus.so'),
- }
- class Encoder(c.Structure): # pylint: disable=too-few-public-methods
- """Opus encoder state.
- This contains the complete state of an Opus encoder.
- """
- pass
- opus_exported_functions = [
- # (func,argtypes,restype,errcheck)
- ('opus_encoder_get_size', [c.c_int], c.c_int, None),
- ('opus_strerror', [c.c_int], c.c_char_p, None),
- ('opus_encoder_create', [c.c_int, c.c_int, c.c_int, c.POINTER(c.c_int)], c.POINTER(Encoder), None),
- ('opus_encoder_ctl', [], c.c_int, None),
- ('opus_encode', [c.POINTER(Encoder), c.POINTER(c.c_int16), c.c_int, c.c_char_p, c.c_int32], c.c_int32, None),
- ('opus_encoder_destroy', [c.POINTER(Encoder)], None, None),
- ]
- # No Error
- OK = 0
- SET_SIGNAL_REQUEST = 4024
- OPUS_APPLICATION_VOIP = 2048
- OPUS_SIGNAL_VOICE = 3001
- def ctl_set(request):
- """Set new CTL value to a encoder/decoder"""
- def inner(func, obj, value):
- result_code = func(obj, request, value)
- if result_code is not OK:
- raise str(result_code)
- return inner
- # Configures the type of signal being encoded
- set_signal = ctl_set(SET_SIGNAL_REQUEST)
- class Opus:
- def __init__(self):
- self.dll = my_ctypes.lib_load_default(dll_fps, opus_exported_functions)
- def get_size(self, channels: int) -> typing.Union[int, typing.Any]:
- """Gets the size of an OpusEncoder structure."""
- if channels not in (1, 2):
- raise ValueError('Wrong channels value. Must be equal to 1 or 2')
- return self.dll.opus_encoder_get_size(channels)
- def create_state(self, fs: int, channels: int, application: int) -> c.Structure:
- """Allocates and initializes an encoder state."""
- result_code = c.c_int()
- result = self.dll.opus_encoder_create(
- fs,
- channels,
- application,
- c.byref(result_code)
- )
- if result_code.value != 0:
- raise str(self.dll.opus_strerror(result_code.value))
- return result
- def encoder_ctl(self, encoder_state: c.Structure, request, value=None) -> typing.Union[int, typing.Any]:
- if value is not None:
- return request(self.dll.opus_encoder_ctl, encoder_state, value)
- return request(self.dll.opus_encoder_ctl, encoder_state)
- def encode(self,
- encoder_state: c.Structure,
- pcm_data: bytes,
- frame_size: int,
- max_data_bytes: int
- ) -> typing.Union[bytes, typing.Any]:
- """
- Encodes an Opus Frame.
- Returns string output payload.
- Parameters:
- [in] st OpusEncoder*: Encoder state
- [in] pcm opus_int16*: Input signal (interleaved if 2 channels). length
- is frame_size*channels*sizeof(opus_int16)
- [in] frame_size int: Number of samples per channel in the input signal.
- This must be an Opus frame size for the encoder's sampling rate. For
- example, at 48 kHz the permitted values are 120, 240, 480, 960,
- 1920, and 2880. Passing in a duration of less than 10 ms
- (480 samples at 48 kHz) will prevent the encoder from using the
- LPC or hybrid modes.
- [out] data unsigned char*: Output payload. This must contain storage
- for at least max_data_bytes.
- [in] max_data_bytes opus_int32: Size of the allocated memory for the
- output payload. This may be used to impose an upper limit on the
- instant bitrate, but should not be used as the only bitrate control.
- Use OPUS_SET_BITRATE to control the bitrate.
- """
- pcm_pointer = c.cast(pcm_data, c.POINTER(c.c_int16))
- opus_data = (c.c_char * max_data_bytes)()
- result = self.dll.opus_encode(
- encoder_state,
- pcm_pointer,
- frame_size,
- opus_data,
- max_data_bytes
- )
- if result < 0:
- raise str(self.dll.opus_strerror(result))
- return array.array('b', opus_data[:result]).tobytes()
- def destroy(self, encoder_state: c.Structure):
- self.dll.opus_encoder_destroy(encoder_state)
|