简介:本文详细介绍如何使用Python搭建外呼系统,涵盖系统架构设计、核心组件实现、第三方服务集成及性能优化策略,提供完整代码示例与部署方案。
外呼系统需满足高并发、低延迟、稳定可靠的核心需求,其技术架构可分为四层:
系统关键指标设计:
使用pjsip库构建SIP信令处理模块:
import pjsua as pjclass SipAccountCallback(pj.AccountCallback):def on_incoming_call(self, call):print("Incoming call from:", call.info().remote_uri)call_params = pj.CallOpParam(True)call.answer(200, call_params)class SipClient:def __init__(self, account_config):self.lib = pj.Lib()self.lib.init()self.lib.create_transport(pj.TransportType.UDP, 5060)self.lib.start()self.acc = self.lib.create_account(account_config, cb=SipAccountCallback())def make_call(self, target_uri):line = self.acc.get_line(0)call = line.make_call(target_uri, pj.CallOpParam())return call
基于Redis实现分布式任务队列:
import redisimport jsonfrom threading import Threadclass CallScheduler:def __init__(self):self.redis = redis.StrictRedis(host='localhost', port=6379, db=0)self.worker_threads = []def add_task(self, phone_number, campaign_id):task = {'phone': phone_number,'campaign': campaign_id,'status': 'pending','timestamp': time.time()}self.redis.rpush('call_queue', json.dumps(task))def worker(self):while True:_, task_json = self.redis.blpop('call_queue', timeout=10)task = json.loads(task_json)try:# 执行呼叫逻辑self.process_call(task)self.redis.hset(f'call:{task["phone"]}', 'status', 'completed')except Exception as e:self.redis.hset(f'call:{task["phone"]}', 'status', 'failed')def start_workers(self, count=5):for _ in range(count):t = Thread(target=self.worker)t.daemon = Truet.start()self.worker_threads.append(t)
使用MySQL存储通话记录:
import pymysqlfrom datetime import datetimeclass CallRecorder:def __init__(self):self.conn = pymysql.connect(host='localhost',user='callcenter',password='securepass',db='callcenter',charset='utf8mb4')def log_call(self, call_id, phone, duration, status):with self.conn.cursor() as cursor:sql = """INSERT INTO call_records(call_id, phone_number, duration, status, create_time)VALUES (%s, %s, %s, %s, %s)"""cursor.execute(sql, (call_id, phone, duration, status, datetime.now()))self.conn.commit()
采用G.711编码格式,使用PyAudio进行音频采集与播放:
import pyaudioimport numpy as npclass AudioProcessor:def __init__(self):self.p = pyaudio.PyAudio()self.stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=8000,input=True,output=True,frames_per_buffer=160)def process_audio(self):while True:data = self.stream.read(160)# 添加回声消除、降噪等处理processed = self.apply_dsp(data)self.stream.write(processed)def apply_dsp(self, data):# 简化的音频处理示例samples = np.frombuffer(data, dtype=np.int16)# 增益控制samples = samples * 0.8return samples.tobytes()
采用令牌桶算法实现速率限制:
import timefrom collections import dequeclass RateLimiter:def __init__(self, rate, per):self.tokens = deque()self.rate = rate # 每秒令牌数self.per = per # 每次消费令牌数def consume(self):now = time.time()# 移除过期令牌while self.tokens and self.tokens[0] <= now - self.per:self.tokens.popleft()# 添加新令牌if len(self.tokens) < self.rate:self.tokens.append(now)return Trueelse:return False
使用Docker Compose编排服务:
version: '3.8'services:sip-proxy:image: opensips:latestports:- "5060:5060/udp"volumes:- ./opensips.cfg:/etc/opensips/opensips.cfgcall-engine:build: ./call-engineenvironment:- REDIS_HOST=redis- DB_HOST=mysqldepends_on:- redis- mysqlredis:image: redis:6-alpinemysql:image: mysql:8environment:MYSQL_ROOT_PASSWORD: rootpassMYSQL_DATABASE: callcenter
class DBPool:
_pool = None
@classmethoddef get_pool(cls):if not cls._pool:cls._pool = PooledDB(creator=pymysql,maxconnections=20,mincached=5,host='localhost',user='callcenter',password='securepass',database='callcenter')return cls._pool
2. **异步处理**:采用Celery实现异步任务队列```pythonfrom celery import Celeryapp = Celery('callcenter', broker='redis://localhost:6379/0')@app.taskdef process_call_async(call_data):# 异步处理呼叫逻辑pass
使用Prometheus+Grafana构建监控体系:
from prometheus_client import start_http_server, Gaugeclass CallMetrics:def __init__(self):self.active_calls = Gauge('active_calls', 'Number of active calls')self.call_success = Gauge('call_success_rate', 'Success rate of calls')self.avg_duration = Gauge('avg_call_duration', 'Average call duration')def update_metrics(self):# 从数据库获取实时数据success_rate = self.get_success_rate()avg_duration = self.get_avg_duration()self.call_success.set(success_rate)self.avg_duration.set(avg_duration)
使用ELK Stack实现日志集中管理:
import loggingfrom elasticsearch import Elasticsearchclass ESLogger:def __init__(self):self.es = Elasticsearch(['localhost:9200'])self.logger = logging.getLogger('callcenter')self.logger.setLevel(logging.INFO)def log_call(self, call_id, event, details):doc = {'@timestamp': datetime.now().isoformat(),'call_id': call_id,'event': event,'details': details}self.es.index(index='call-logs', document=doc)
def mask_phone_number(phone):
if re.match(r’^1[3-9]\d{9}$’, phone):
return phone[:3] + ‘**‘ + phone[7:]
return phone
2. **通话录音加密**:```pythonfrom cryptography.fernet import Fernetclass AudioEncryptor:def __init__(self):self.key = Fernet.generate_key()self.cipher = Fernet(self.key)def encrypt_recording(self, audio_data):return self.cipher.encrypt(audio_data)def decrypt_recording(self, encrypted_data):return self.cipher.decrypt(encrypted_data)
def select_best_gateway(phone_number):# 根据号码归属地选择最优网关area_code = phone_number[:4]gateways = {'010': ['gateway-bj'],'020': ['gateway-gz'],# 其他地区...}return gateways.get(area_code, ['default-gateway'])[0]
本文详细阐述了使用Python搭建外呼系统的完整方案,从底层协议实现到上层业务逻辑,涵盖了性能优化、安全防护等关键环节。实际部署时,建议先进行小规模测试,逐步验证各模块功能,再根据业务需求进行定制开发。系统上线后需建立完善的监控体系,确保7×24小时稳定运行。