音视频项目:RTMP流媒体服务器开发指南
项目概述
本项目是一个基于muduo网络库实现的高性能RTMP流媒体服务器,支持:
- ✅ H.264 + AAC 音视频编码格式
- ✅ 多路推流和拉流
- ✅ GOP缓存机制 实现快速首屏
- ✅ 音视频同步
- ✅ 低延迟优化
- ✅ 时间戳映射 保证播放连续性
技术栈
- 网络框架: muduo (高性能网络库)
- 协议支持: RTMP 1.0 规范
- 音视频格式: H.264 + AAC
- 编程语言: C++17
- 构建工具: CMake
视频讲解及源码领取:https://www.bilibili.com/video/BV1mou4zhEFX/
快速开始
编译运行步骤
# 1. 编译RTMP服务器 mkdir build && cd build # 配置CMake(Debug版本,便于调试) cmake -DCMAKE_BUILD_TYPE=Debug .. # 或者(默认debug模式) cmake .. # 或者编译Release版本(生产环境) cmake -DCMAKE_BUILD_TYPE=Release .. # 编译(使用所有CPU核心) make -j$(nproc) # 2.修改配置文件(可以不改,用默认的就行) 配置文件默认路径: 相对于build目录是../config.json, 即是在项目源码根目录 # 3. 运行程序 # 方法1:直接运行 运行服务器,默认使用../config.json配置文件 ./bin/rtmp_server # 方法2:指定配置文件 ./bin/rtmp_server --config=/path/to/your/config.json # 4. 测试(另开终端) # 推流: ffmpeg -re -i test_video.mp4 -c copy -f flv rtmp://localhost:1935/live/test # 拉流: ffplay rtmp://localhost:1935/live/test
系统架构
整体架构图
核心模块职责
模块 | 职责 | 文件 |
RtmpServer | 服务器主框架,管理TCP连接和配置 | rtmp_server.h/cc |
RtmpConnection | 单个RTMP连接处理(握手、命令、媒体数据) | rtmp_connection.h/cc |
StreamManager | 流管理,协调推流端和拉流端 | stream_manager.h/cc |
RtmpProtocolParser | RTMP协议解析和消息序列化 | rtmp_protocol.h/cc |
GopCache | GOP缓存,实现快速首屏 | gop_cache.h/cc |
AmfCodec | AMF数据编解码 | rtmp_protocol.h/cc |
核心模块详解
1. RtmpServer 服务器主框架
class RtmpServer {
// 核心功能
void start(); // 启动服务器
void stop(); // 停止服务器
void onConnection(); // 处理新连接
void onMessage(); // 处理消息
// 配置管理
RtmpServerConfig config_; // 服务器配置
// 连接管理
std::unordered_map<std::string,
std::shared_ptr<RtmpConnection>> connections_;
// 流管理
std::shared_ptr<StreamManager> streamManager_;
};
关键配置参数:
struct RtmpServerConfig {
std::string listenAddress = "0.0.0.0";
uint16_t listenPort = 1935;
int maxConnections = 1000;
int maxStreams = 100;
int gopCacheMaxCount = 1; // GOP缓存数量(优化延迟)
int gopCacheMaxSizeBytes = 2 * 1024 * 1024; // 2MB GOP缓存大小
int chunkSize = 60000; // RTMP块大小
};
2. RtmpConnection 连接处理
class RtmpConnection {
// 连接状态
enum class RtmpConnectionType {
kUnknown,
kPublisher, // 推流连接
kSubscriber // 拉流连接
};
// 核心处理流程
void onMessage(); // 接收TCP数据
bool handleHandshake(); // RTMP握手处理
void processMessages(); // 处理RTMP消息
void processCommand(); // 处理RTMP命令
void processAudioData(); // 处理音频数据
void processVideoData(); // 处理视频数据
void processMetaData(); // 处理元数据
};
3. StreamManager 流管理器
class StreamManager {
// 流管理
std::shared_ptr<Stream> createStream(const std::string& name);
std::shared_ptr<Stream> getStream(const std::string& name);
// 推流管理
std::shared_ptr<Publisher> createPublisher(const std::string& streamName,
TcpConnectionPtr conn);
// 拉流管理
std::shared_ptr<Subscriber> createSubscriber(const std::string& streamName,
TcpConnectionPtr conn);
// 数据转发
void onPublisherData(const std::string& streamName,
std::shared_ptr<MediaPacket> packet);
private:
std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
};
RTMP协议实现
RTMP协议栈
+---------------------------+ | Application Layer | <- RTMP Commands, Media Data +---------------------------+ | RTMP Messages | <- Audio/Video/Command Messages +---------------------------+ | RTMP Chunks | <- Chunk分片传输 +---------------------------+ | TCP | <- 可靠传输 +---------------------------+
1. 消息类型定义
enum class RtmpMessageType : uint8_t {
kSetChunkSize = 1, // 设置Chunk大小
kAbort = 2, // 中止消息
kAck = 3, // 确认消息
kUserControl = 4, // 用户控制消息
kWindowAckSize = 5, // 窗口确认大小
kSetPeerBandwidth = 6, // 设置对等带宽
kAudio = 8, // 音频数据
kVideo = 9, // 视频数据
kDataAMF0 = 18, // AMF0数据消息
kCommandAMF0 = 20, // AMF0命令消息
};
2. Chunk格式
enum class RtmpChunkFormat : uint8_t {
kType0 = 0, // 完整消息头 (11字节)
kType1 = 1, // 无消息流ID (7字节)
kType2 = 2, // 仅时间戳增量 (3字节)
kType3 = 3 // 仅数据 (0字节)
};
Chunk Header 结构:
Type 0: [Basic Header][Timestamp][Message Length][Message Type ID][Message Stream ID] Type 1: [Basic Header][Timestamp Delta][Message Length][Message Type ID] Type 2: [Basic Header][Timestamp Delta] Type 3: [Basic Header]
3. AMF编解码
class AmfCodec {
// 基本类型编解码
static void encodeAmfValue(Buffer* buffer, const AmfValue& value);
static bool decodeAmfValue(Buffer* buffer, AmfValue* value);
// 复合类型编解码
static void encodeAmfObject(Buffer* buffer,
const std::unordered_map<std::string, AmfValue>& object);
static bool decodeAmfObject(Buffer* buffer,
std::unordered_map<std::string, AmfValue>* object);
};
AMF数据类型:
enum class AmfType : uint8_t {
kNumber = 0x00, // 64位双精度浮点数
kBoolean = 0x01, // 布尔值
kString = 0x02, // UTF-8字符串
kObject = 0x03, // 对象
kNull = 0x05, // 空值
kUndefined = 0x06, // 未定义
kEcmaArray = 0x08, // ECMA数组
kObjectEnd = 0x09, // 对象结束
kStrictArray = 0x0A, // 严格数组
};
推流实现详解
推流交互时序图
推流核心代码流程
1. 处理publish命令
void RtmpConnection::handlePublish(const RtmpCommand& command) {
// 解析流名
std::string streamName = command.getStringArg(0);
std::string publishType = command.getStringArg(1); // "live"
// 设置连接类型
connectionType_ = RtmpConnectionType::kPublisher;
streamName_ = streamName;
// 创建Publisher对象
auto mgr = streamManager_.lock();
auto conn = tcpConnection_.lock();
publisher_ = mgr->createPublisher(streamName, conn);
// 启动推流
publisher_->startPublish();
// 发送响应
sendOnFCPublish("NetStream.Publish.Start", "Started publishing stream.");
sendPublishStatus("NetStream.Publish.Start", "Started publishing stream.");
// 更新状态
parser_.setState(RtmpConnectionState::kPublishing);
}
2. 处理媒体数据
void RtmpConnection::processAudioData(const RtmpMessage& message) {
if (!isPublisher() || !publisher_) return;
// 创建MediaPacket
auto packet = std::make_shared<MediaPacket>(
MediaPacketType::kAudio,
message.payload(),
message.header().timestamp,
message.header().messageStreamId
);
// 转发给Publisher
publisher_->onMediaData(packet);
}
void RtmpConnection::processVideoData(const RtmpMessage& message) {
if (!isPublisher() || !publisher_) return;
// 创建MediaPacket
auto packet = std::make_shared<MediaPacket>(
MediaPacketType::kVideo,
message.payload(),
message.header().timestamp,
message.header().messageStreamId
);
// 转发给Publisher
publisher_->onMediaData(packet);
}
3. Publisher数据转发
void Publisher::onMediaData(std::shared_ptr<MediaPacket> packet) {
if (!packet || !isPublishing_) return;
updateLastActiveTime();
updateStats(packet->data().size(), 1);
// 转发给StreamManager
if (auto mgr = manager_.lock()) {
mgr->onPublisherData(streamName_, packet);
}
}
关键实现细节
1. 序列头检测和缓存
// 检测AVC Sequence Header
bool isAvcSequenceHeader(const MediaPacket& packet) {
return packet.type() == MediaPacketType::kVideo &&
packet.videoCodec() == VideoCodec::kAVC &&
packet.data().size() > 1 &&
static_cast<uint8_t>(packet.data()[1]) == 0;
}
// 检测AAC Sequence Header
bool isAacSequenceHeader(const MediaPacket& packet) {
return packet.type() == MediaPacketType::kAudio &&
packet.audioFormat() == AudioFormat::kAAC &&
packet.data().size() > 1 &&
static_cast<uint8_t>(packet.data()[1]) == 0;
}
2. 关键帧检测
bool MediaPacket::isKeyFrame() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return false;
}
// FLV视频标签格式:Frame Type (4 bits) + Codec ID (4 bits)
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t frameType = (firstByte >> 4) & 0x0F;
return frameType == 1; // 1 = 关键帧
}
拉流实现详解
拉流交互时序图
拉流核心代码流程
1. 处理play命令
void RtmpConnection::handlePlay(const RtmpCommand& command) {
std::string streamName = command.getStringArg(0);
// 设置连接类型
connectionType_ = RtmpConnectionType::kSubscriber;
streamName_ = streamName;
// 创建Subscriber对象
auto mgr = streamManager_.lock();
auto conn = tcpConnection_.lock();
subscriber_ = mgr->createSubscriber(streamName, conn);
// 发送RTMP标准播放响应序列
uint32_t streamId = 1;
// 1. StreamBegin用户控制消息
sendStreamBegin(streamId);
// 2. NetStream.Play.Reset
sendPlayStatus("NetStream.Play.Reset", "Playing and resetting stream.");
// 3. NetStream.Play.Start
sendPlayStatus("NetStream.Play.Start", "Started playing stream.");
// 4. RtmpSampleAccess
sendRtmpSampleAccess();
// 5. NetStream.Data.Start
sendDataStart();
// 启动播放
subscriber_->startPlay();
parser_.setState(RtmpConnectionState::kPlaying);
}
2. Subscriber播放启动
void Subscriber::startPlay() {
if (!isPlaying_) {
isPlaying_ = true;
startTime_ = Timestamp::now();
// 重置时间戳映射器
timeStampMapper_.reset();
// 发送缓存数据
if (auto mgr = manager_.lock()) {
auto stream = mgr->getStream(streamName_);
if (stream) {
// 1. 发送序列头(必须)
auto sequenceHeaders = stream->getSequenceHeaders();
if (!sequenceHeaders.empty()) {
sendCachedData(sequenceHeaders);
}
// 2. 发送GOP缓存(可选,快速首屏)
auto gopPackets = stream->gopCache()->getLatestKeyFramePackets();
if (!gopPackets.empty()) {
sendCachedData(gopPackets);
}
}
}
}
}
3. 实时数据广播
void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
// 复制订阅者列表(避免在发送时修改)
{
MutexLockGuard lock(mutex_);
subscribersCopy.reserve(subscribers_.size());
for (const auto& pair : subscribers_) {
if (pair.second && pair.second->isConnected()) {
subscribersCopy.push_back(pair.second);
}
}
}
// 发送数据
for (auto& subscriber : subscribersCopy) {
subscriber->sendMediaData(packet);
}
}
关键实现细节
1. 序列头管理
// Stream类独立管理序列头
class Stream {
private:
std::shared_ptr<MediaPacket> avcSequenceHeader_; // H.264配置
std::shared_ptr<MediaPacket> aacSequenceHeader_; // AAC配置
std::shared_ptr<MediaPacket> metadataPacket_; // 元数据
public:
std::vector<std::shared_ptr<MediaPacket>> getSequenceHeaders() const {
std::vector<std::shared_ptr<MediaPacket>> headers;
MutexLockGuard lock(mutex_);
// 按顺序添加
if (metadataPacket_) headers.push_back(metadataPacket_);
if (avcSequenceHeader_) headers.push_back(avcSequenceHeader_);
if (aacSequenceHeader_) headers.push_back(aacSequenceHeader_);
return headers;
}
};
2. 播放响应消息格式
// StreamBegin用户控制消息
void RtmpConnection::sendStreamBegin(uint32_t streamId) {
Buffer buffer;
buffer.appendInt16(0x0000); // 事件类型:StreamBegin
buffer.appendInt32(streamId);
RtmpMessageHeader header;
header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kUserControl);
header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
sendMessage(message);
}
// onStatus消息
void RtmpConnection::sendPlayStatus(const std::string& code, const std::string& description) {
Buffer buffer;
// 命令名:onStatus
AmfCodec::encodeAmfValue(&buffer, AmfValue("onStatus"));
// 事务ID:0
AmfCodec::encodeAmfValue(&buffer, AmfValue(0.0));
// 命令对象:null
AmfCodec::encodeAmfValue(&buffer, AmfValue());
// 信息对象
std::unordered_map<std::string, AmfValue> info;
info["level"] = AmfValue("status");
info["code"] = AmfValue(code);
info["description"] = AmfValue(description);
AmfCodec::encodeAmfObject(&buffer, info);
RtmpMessageHeader header;
header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kCommandAMF0);
header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
header.messageStreamId = 1;
RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
sendMessage(message);
}
多路流处理
多路推流架构
多路拉流架构
核心实现
1. 流管理器
class StreamManager {
private:
std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
public:
// 创建或获取流
std::shared_ptr<Stream> getOrCreateStream(const std::string& streamName) {
MutexLockGuard lock(mutex_);
auto it = streams_.find(streamName);
if (it != streams_.end()) {
return it->second;
}
// 创建新流
auto stream = std::make_shared<Stream>(streamName);
stream->initialize();
streams_[streamName] = stream;
return stream;
}
// 数据转发
void onPublisherData(const std::string& streamName,
std::shared_ptr<MediaPacket> packet) {
auto stream = getStream(streamName);
if (stream) {
stream->onMediaData(packet); // 广播给所有订阅者
}
}
};
2. 订阅者管理
class Stream {
private:
std::unordered_map<std::string, std::shared_ptr<Subscriber>> subscribers_;
public:
void addSubscriber(std::shared_ptr<Subscriber> subscriber) {
MutexLockGuard lock(mutex_);
subscribers_[subscriber->id()] = subscriber;
LOG_INFO << "Added subscriber: " << subscriber->id()
<< " to stream: " << name_
<< ", total subscribers: " << subscribers_.size();
}
void removeSubscriber(const std::string& subscriberId) {
MutexLockGuard lock(mutex_);
auto it = subscribers_.find(subscriberId);
if (it != subscribers_.end()) {
subscribers_.erase(it);
LOG_INFO << "Removed subscriber: " << subscriberId
<< " from stream: " << name_
<< ", remaining subscribers: " << subscribers_.size();
}
}
};
3. 并发数据广播
void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
// 使用副本避免长时间持锁
std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
{
MutexLockGuard lock(mutex_);
subscribersCopy.reserve(subscribers_.size());
for (const auto& pair : subscribers_) {
if (pair.second && pair.second->isConnected()) {
subscribersCopy.push_back(pair.second);
}
}
}
// 并发发送数据
for (auto& subscriber : subscribersCopy) {
subscriber->sendMediaData(packet);
}
}
性能优化
1. 连接池管理
class RtmpServer {
private:
std::unordered_map<std::string, std::shared_ptr<RtmpConnection>> connections_;
std::atomic<int> connectionCount_{0};
public:
void onConnection(const TcpConnectionPtr& conn) {
if (connectionCount_.load() >= config_.maxConnections) {
LOG_WARN << "Max connections reached, rejecting new connection";
conn->shutdown();
return;
}
// 创建RTMP连接
auto rtmpConn = std::make_shared<RtmpConnection>(conn, streamManager_);
connections_[rtmpConn->id()] = rtmpConn;
connectionCount_.fetch_add(1);
}
void onDisconnection(const TcpConnectionPtr& conn) {
connectionCount_.fetch_sub(1);
// 清理连接
}
};
2. 内存优化
// 使用对象池减少内存分配
class MediaPacketPool {
private:
std::queue<std::shared_ptr<MediaPacket>> pool_;
std::mutex mutex_;
public:
std::shared_ptr<MediaPacket> acquire() {
std::lock_guard<std::mutex> lock(mutex_);
if (!pool_.empty()) {
auto packet = pool_.front();
pool_.pop();
return packet;
}
return std::make_shared<MediaPacket>();
}
void release(std::shared_ptr<MediaPacket> packet) {
packet->reset();
std::lock_guard<std::mutex> lock(mutex_);
pool_.push(packet);
}
};
GOP缓存机制
GOP缓存架构
GOP缓存实现
1. 核心数据结构
class GopCache {
private:
std::shared_ptr<Gop> currentGop_; // 当前GOP
size_t maxCacheSize_; // 最大缓存大小
size_t currentCacheSize_; // 当前缓存大小
// 音视频对齐缓冲区
std::deque<std::shared_ptr<MediaPacket>> audioBuffer_;
uint32_t lastKeyFrameTimestamp_ = 0;
static const uint32_t AUDIO_ALIGN_WINDOW = 200; // 200ms对齐窗口
public:
void addPacket(std::shared_ptr<MediaPacket> packet);
std::vector<std::shared_ptr<MediaPacket>> getCachedPackets() const;
std::vector<std::shared_ptr<MediaPacket>> getLatestKeyFramePackets() const;
};
2. GOP检测和切换
bool GopCache::needNewGop(std::shared_ptr<MediaPacket> packet) const {
// 没有当前GOP
if (!currentGop_) {
return true;
}
// 检测到新的关键帧
if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
return true;
}
return false;
}
void GopCache::addPacket(std::shared_ptr<MediaPacket> packet) {
MutexLockGuard lock(mutex_);
// 音频帧:维护音频缓冲区
if (packet->type() == MediaPacketType::kAudio) {
maintainAudioBuffer(packet);
}
// 检查是否需要新GOP
bool isNewGop = needNewGop(packet);
if (isNewGop) {
startNewGop();
// 关键帧:进行音视频对齐
if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
lastKeyFrameTimestamp_ = packet->timestamp();
alignAudioWithKeyFrame(lastKeyFrameTimestamp_);
}
}
// 添加到当前GOP
if (!currentGop_) {
startNewGop();
}
currentGop_->addPacket(packet);
updateCacheSize();
}
3. 音视频对齐
void GopCache::alignAudioWithKeyFrame(uint32_t keyFrameTimestamp) {
if (audioBuffer_.empty() || !currentGop_) {
return;
}
std::vector<std::shared_ptr<MediaPacket>> alignedAudio;
// 查找关键帧前后的音频帧
for (const auto& audioPacket : audioBuffer_) {
uint32_t audioTimestamp = audioPacket->timestamp();
int32_t timeDiff = static_cast<int32_t>(audioTimestamp) -
static_cast<int32_t>(keyFrameTimestamp);
// 收集关键帧前200ms到后50ms的音频帧
if (timeDiff >= -static_cast<int32_t>(AUDIO_ALIGN_WINDOW) &&
timeDiff <= 50) {
alignedAudio.push_back(audioPacket);
}
}
// 按时间戳排序
std::sort(alignedAudio.begin(), alignedAudio.end(),
[](const std::shared_ptr<MediaPacket>& a,
const std::shared_ptr<MediaPacket>& b) {
return a->timestamp() < b->timestamp();
});
// 添加到GOP开头
for (const auto& audioPacket : alignedAudio) {
currentGop_->addPacket(audioPacket);
}
LOG_INFO << "Aligned " << alignedAudio.size()
<< " audio frames with key frame at " << keyFrameTimestamp;
}
4. 快速首屏优化
std::vector<std::shared_ptr<MediaPacket>>
GopCache::getLatestKeyFramePackets() const {
MutexLockGuard lock(mutex_);
std::vector<std::shared_ptr<MediaPacket>> packets;
if (currentGop_ && !currentGop_->empty()) {
const auto& gopPackets = currentGop_->packets();
// 找到关键帧位置
size_t keyFrameIndex = 0;
bool foundKeyFrame = false;
for (size_t i = 0; i < gopPackets.size(); ++i) {
if (gopPackets[i]->type() == MediaPacketType::kVideo &&
gopPackets[i]->isKeyFrame()) {
keyFrameIndex = i;
foundKeyFrame = true;
break;
}
}
if (foundKeyFrame) {
// 从关键帧开始,最多取10个包(约1秒内容)
size_t endIndex = std::min(keyFrameIndex + 10, gopPackets.size());
for (size_t i = keyFrameIndex; i < endIndex; ++i) {
packets.push_back(gopPackets[i]);
}
}
}
return packets;
}
GOP缓存统计
struct GopInfo {
uint32_t audioDuration = 0; // 音频时长
uint32_t videoDuration = 0; // 视频时长
size_t audioPackets = 0; // 音频包数
size_t videoPackets = 0; // 视频包数
size_t totalSize = 0; // 总大小
uint32_t firstTimestamp = 0; // 第一个时间戳
uint32_t lastTimestamp = 0; // 最后一个时间戳
bool hasKeyFrame = false; // 是否有关键帧
uint32_t audioSampleRate = 44100; // 音频采样率
double audioFrameDuration = 0.0; // 音频帧时长
};
GopInfo Gop::getGopInfo() const {
GopInfo info;
if (packets_.empty()) {
return info;
}
// 统计音视频帧数和时长
uint32_t minTimestamp = packets_[0]->timestamp();
uint32_t maxTimestamp = packets_[0]->timestamp();
for (const auto& packet : packets_) {
uint32_t timestamp = packet->timestamp();
minTimestamp = std::min(minTimestamp, timestamp);
maxTimestamp = std::max(maxTimestamp, timestamp);
info.totalSize += packet->data().size();
if (packet->type() == MediaPacketType::kAudio) {
info.audioPackets++;
} else if (packet->type() == MediaPacketType::kVideo) {
info.videoPackets++;
if (packet->isKeyFrame()) {
info.hasKeyFrame = true;
}
}
}
info.firstTimestamp = minTimestamp;
info.lastTimestamp = maxTimestamp;
// 计算音频帧时长(基于44.1kHz采样率,1024采样点/帧)
info.audioFrameDuration = 1024.0 / 44100.0 * 1000.0; // 23.22ms
info.audioDuration = static_cast<uint32_t>(info.audioPackets * info.audioFrameDuration);
return info;
}
时间戳同步
时间戳映射器
时间戳映射实现
1. 时间戳映射器结构
struct TimestampMapper {
// 统一基准时间戳
uint32_t baseTimestamp = 0;
bool initialized = false;
// 音频同步
uint32_t audioBaseTimestamp = 0;
uint32_t audioFrameNum = 0;
uint32_t audioFrameInterval = 0; // 动态计算
bool audioInitialized = false;
// 视频同步
uint32_t videoBaseTimestamp = 0;
uint32_t lastVideoTimestamp = 0;
bool videoInitialized = false;
// 同步阈值
static const uint32_t SYNC_THRESHOLD = 300; // 300ms
uint32_t mapTimestamp(uint32_t originalTimestamp, MediaPacketType type);
void reset();
};
2. 音频时间戳映射
uint32_t TimestampMapper::mapAudioTimestamp(uint32_t originalTimestamp) {
// 初始化统一基准
initializeBase(originalTimestamp);
if (originalTimestamp < baseTimestamp) {
return lastAudioTimestamp; // 防止时间戳倒退
}
uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
// 动态计算音频帧间隔
calculateAudioFrameInterval(originalTimestamp);
// 初始化音频基准
if (!audioInitialized) {
audioBaseTimestamp = mappedTimestamp;
audioFrameNum = 0;
audioInitialized = true;
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 如果还没有稳定的帧间隔,直接返回
if (!audioFrameIntervalCalculated) {
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 计算期望的音频时间戳
uint32_t expectedTimestamp = audioBaseTimestamp + audioFrameNum * audioFrameInterval;
// 检查时间戳差异
int32_t timeDiff = static_cast<int32_t>(mappedTimestamp) -
static_cast<int32_t>(expectedTimestamp);
// 如果在同步阈值内,使用期望时间戳
if (abs(timeDiff) <= SYNC_THRESHOLD) {
lastAudioTimestamp = expectedTimestamp;
audioFrameNum++;
return expectedTimestamp;
} else {
// 超出阈值,重新校准
audioBaseTimestamp = mappedTimestamp;
audioFrameNum = 0;
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
}
3. 视频时间戳映射
uint32_t TimestampMapper::mapVideoTimestamp(uint32_t originalTimestamp) {
// 初始化统一基准
initializeBase(originalTimestamp);
if (originalTimestamp < baseTimestamp) {
return lastVideoTimestamp; // 防止时间戳倒退
}
uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
// 初始化视频基准
if (!videoInitialized) {
videoBaseTimestamp = mappedTimestamp;
videoInitialized = true;
lastVideoTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 检查时间戳单调性
if (mappedTimestamp < lastVideoTimestamp) {
// 时间戳倒退,使用上一个时间戳
return lastVideoTimestamp;
}
// 检查时间戳跳跃
uint32_t timeDiff = mappedTimestamp - lastVideoTimestamp;
if (timeDiff > 2000) { // 超过2秒的跳跃
LOG_WARN << "Video timestamp jump detected: " << timeDiff << "ms";
// 可以选择重新校准或者限制跳跃
mappedTimestamp = lastVideoTimestamp + 33; // 假设30fps
}
lastVideoTimestamp = mappedTimestamp;
return mappedTimestamp;
}
4. 音频帧间隔计算
void TimestampMapper::calculateAudioFrameInterval(uint32_t currentTimestamp) {
if (!audioFrameIntervalCalculated) {
if (lastRawAudioTimestamp != 0) {
uint32_t interval = currentTimestamp - lastRawAudioTimestamp;
// 过滤异常值(正常音频帧间隔应该在10-50ms之间)
if (interval >= 10 && interval <= 50) {
audioFrameIntervalSum += interval;
audioFrameIntervalCount++;
// 收集足够样本后计算平均值
if (audioFrameIntervalCount >= FRAME_INTERVAL_SAMPLES) {
audioFrameInterval = audioFrameIntervalSum / audioFrameIntervalCount;
audioFrameIntervalCalculated = true;
LOG_INFO << "Audio frame interval calculated: " << audioFrameInterval << "ms";
}
}
}
lastRawAudioTimestamp = currentTimestamp;
}
}
同步检查
bool TimestampMapper::checkSync() const {
if (!initialized || !audioInitialized || !videoInitialized) {
return true; // 未初始化完成,暂不检查
}
// 计算音视频时间戳差异
int32_t avDiff = static_cast<int32_t>(lastAudioTimestamp) -
static_cast<int32_t>(lastVideoTimestamp);
bool inSync = abs(avDiff) <= SYNC_THRESHOLD;
if (!inSync) {
LOG_WARN << "Audio-Video sync issue detected: " << avDiff << "ms";
}
return inSync;
}
关键技术细节
1. RTMP握手实现
class RtmpProtocolParser {
public:
// 生成C0+C1握手数据
std::string generateHandshakeC0C1() {
std::string c0c1;
c0c1.resize(1 + kRtmpHandshakeSize);
// C0: 版本号
c0c1[0] = 0x03;
// C1: 握手数据
char* c1 = &c0c1[1];
// 时间戳 (4字节)
uint32_t timestamp = static_cast<uint32_t>(time(nullptr));
c1[0] = (timestamp >> 24) & 0xFF;
c1[1] = (timestamp >> 16) & 0xFF;
c1[2] = (timestamp >> 8) & 0xFF;
c1[3] = timestamp & 0xFF;
// 版本 (4字节)
c1[4] = c1[5] = c1[6] = c1[7] = 0x00;
// 随机数据 (1528字节)
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 255);
for (int i = 8; i < kRtmpHandshakeSize; ++i) {
c1[i] = static_cast<char>(dis(gen));
}
return c0c1;
}
// 生成C2握手数据
std::string generateHandshakeC2(const std::string& s1) {
std::string c2;
c2.resize(kRtmpHandshakeSize);
// C2是S1的回显
std::memcpy(&c2[0], s1.data(), kRtmpHandshakeSize);
return c2;
}
};
2. Chunk解析
bool RtmpProtocolParser::parseChunkHeader(Buffer* buffer, RtmpChunkHeader* header) {
if (buffer->readableBytes() < 1) {
return false;
}
// 解析Basic Header
uint8_t basicHeader = static_cast<uint8_t>(buffer->peekInt8());
header->format = static_cast<RtmpChunkFormat>((basicHeader >> 6) & 0x03);
uint32_t chunkStreamId = basicHeader & 0x3F;
// 处理扩展Chunk Stream ID
size_t basicHeaderSize = 1;
if (chunkStreamId == 0) {
if (buffer->readableBytes() < 2) return false;
chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) + 64;
basicHeaderSize = 2;
} else if (chunkStreamId == 1) {
if (buffer->readableBytes() < 3) return false;
chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) * 256 +
static_cast<uint8_t>(buffer->peek()[2]) + 64;
basicHeaderSize = 3;
}
header->chunkStreamId = chunkStreamId;
// 计算Message Header大小
size_t messageHeaderSize = 0;
switch (header->format) {
case RtmpChunkFormat::kType0: messageHeaderSize = 11; break;
case RtmpChunkFormat::kType1: messageHeaderSize = 7; break;
case RtmpChunkFormat::kType2: messageHeaderSize = 3; break;
case RtmpChunkFormat::kType3: messageHeaderSize = 0; break;
}
// 检查数据是否足够
if (buffer->readableBytes() < basicHeaderSize + messageHeaderSize) {
return false;
}
// 跳过Basic Header
buffer->retrieve(basicHeaderSize);
// 解析Message Header
if (messageHeaderSize > 0) {
parseMessageHeader(buffer, header, messageHeaderSize);
}
return true;
}
3. AMF编解码实现
// AMF Number编码
void AmfCodec::encodeNumber(Buffer* buffer, double number) {
char data[8];
uint64_t bits;
std::memcpy(&bits, &number, 8);
// 大端序写入
for (int i = 0; i < 8; ++i) {
data[i] = static_cast<char>((bits >> (56 - i * 8)) & 0xFF);
}
buffer->append(data, 8);
}
// AMF String编码
void AmfCodec::encodeString(Buffer* buffer, const std::string& str) {
// 字符串长度(2字节大端序)
buffer->appendInt16(static_cast<int16_t>(str.size()));
// 字符串内容
buffer->append(str);
}
// AMF Object编码
void AmfCodec::encodeAmfObject(Buffer* buffer,
const std::unordered_map<std::string, AmfValue>& object) {
// 对象类型标识
buffer->appendInt8(static_cast<int8_t>(AmfType::kObject));
// 编码键值对
for (const auto& pair : object) {
// 键(不包含类型标识)
buffer->appendInt16(static_cast<int16_t>(pair.first.size()));
buffer->append(pair.first);
// 值(包含类型标识)
encodeAmfValue(buffer, pair.second);
}
// 对象结束标记
buffer->appendInt16(0); // 空键
buffer->appendInt8(static_cast<int8_t>(AmfType::kObjectEnd));
}
4. 消息序列化
void RtmpProtocolParser::serializeMessage(const RtmpMessage& message, Buffer* buffer) {
const auto& header = message.header();
const auto& payload = message.payload();
// 选择Chunk Stream ID
uint32_t chunkStreamId = selectChunkStreamId(header.messageTypeId);
// 计算需要的Chunk数量
uint32_t chunkCount = (payload.size() + chunkSize_ - 1) / chunkSize_;
for (uint32_t i = 0; i < chunkCount; ++i) {
// 计算当前Chunk的payload大小
uint32_t chunkPayloadSize = std::min(chunkSize_,
static_cast<uint32_t>(payload.size() - i * chunkSize_));
// 写入Chunk Header
if (i == 0) {
// 第一个Chunk使用Type 0
writeChunkHeader(buffer, RtmpChunkFormat::kType0, chunkStreamId, header);
} else {
// 后续Chunk使用Type 3
writeChunkHeader(buffer, RtmpChunkFormat::kType3, chunkStreamId, header);
}
// 写入Chunk Payload
buffer->append(&payload[i * chunkSize_], chunkPayloadSize);
}
}
5. 媒体数据检测
class MediaPacket {
public:
// 检测视频帧类型
VideoFrameType videoFrameType() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return VideoFrameType::kInterFrame;
}
// FLV视频标签:Frame Type (4 bits) + Codec ID (4 bits)
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t frameType = (firstByte >> 4) & 0x0F;
return static_cast<VideoFrameType>(frameType);
}
// 检测视频编码格式
VideoCodec videoCodec() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return VideoCodec::kAVC;
}
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t codecId = firstByte & 0x0F;
return static_cast<VideoCodec>(codecId);
}
// 检测音频格式
AudioFormat audioFormat() const {
if (type_ != MediaPacketType::kAudio || data_.empty()) {
return AudioFormat::kAAC;
}
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t format = (firstByte >> 4) & 0x0F;
return static_cast<AudioFormat>(format);
}
// 检测AAC序列头
bool isAacSequenceHeader() const {
if (type_ != MediaPacketType::kAudio || data_.size() < 2) {
return false;
}
// AAC序列头:音频格式=AAC && AAC包类型=0
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t secondByte = static_cast<uint8_t>(data_[1]);
uint8_t format = (firstByte >> 4) & 0x0F;
uint8_t aacPacketType = secondByte;
return (format == static_cast<uint8_t>(AudioFormat::kAAC)) &&
(aacPacketType == 0);
}
};
监控和日志
1. 日志级别
// 设置日志级别 Logger::setLogLevel(Logger::LogLevel::DEBUG); // 关键日志输出 LOG_INFO << "RTMP Server started on " << config_.listenAddress << ":" << config_.listenPort; LOG_DEBUG << "Received media packet: " << packet->data().size() << " bytes"; LOG_WARN << "Connection timeout: " << connectionId; LOG_ERROR << "Failed to parse RTMP message";
2. 性能监控
struct ServerStatistics {
std::atomic<uint64_t> totalConnections{0};
std::atomic<uint64_t> currentConnections{0};
std::atomic<uint64_t> totalStreams{0};
std::atomic<uint64_t> currentStreams{0};
std::atomic<uint64_t> totalBytesReceived{0};
std::atomic<uint64_t> totalBytesSent{0};
std::atomic<uint64_t> totalPacketsReceived{0};
std::atomic<uint64_t> totalPacketsSent{0};
};
// 定期输出统计信息
void printStatistics() {
auto stats = server->getStatistics();
LOG_INFO << "=== RTMP Server Statistics ===";
LOG_INFO << "Connections: " << stats.currentConnections.load()
<< "/" << stats.totalConnections.load();
LOG_INFO << "Streams: " << stats.currentStreams.load()
<< "/" << stats.totalStreams.load();
LOG_INFO << "Bytes: RX=" << stats.totalBytesReceived.load()
<< " TX=" << stats.totalBytesSent.load();
LOG_INFO << "Packets: RX=" << stats.totalPacketsReceived.load()
<< " TX=" << stats.totalPacketsSent.load();
}
常见问题解决
1. 连接问题
问题: 客户端连接超时
ERROR: Connection timeout: conn_12345
解决方案:
- 检查防火墙设置
- 确认端口1935未被占用
- 检查网络连通性
# 检查端口监听 netstat -tlnp | grep 1935 # 测试连接 telnet localhost 1935
2. 握手失败
问题: RTMP握手失败
ERROR: Invalid handshake data from: conn_12345
解决方案:
- 检查客户端RTMP版本(必须为3)
- 验证握手数据完整性
- 检查网络传输是否稳定
// 调试握手过程 LOG_DEBUG << "Handshake state: " << static_cast<int>(parser_.state()); LOG_DEBUG << "Handshake data size: " << buffer->readableBytes();
3. 音视频同步问题
问题: 音视频不同步
WARN: Audio-Video sync issue detected: 500ms
解决方案:
- 检查时间戳映射器配置
- 调整同步阈值
- 检查GOP缓存对齐
// 调整同步阈值
static const uint32_t SYNC_THRESHOLD = 500; // 增加到500ms
// 检查音视频时间戳
LOG_DEBUG << "Audio timestamp: " << audioTimestamp
<< " Video timestamp: " << videoTimestamp
<< " Diff: " << (audioTimestamp - videoTimestamp);
4. 调试技巧
1. 抓包分析
# 使用tcpdump抓包 sudo tcpdump -i lo -w rtmp.pcap port 1935 # 使用wireshark分析 wireshark rtmp.pcap
2. 性能分析
# 使用top监控CPU和内存 top -p $(pgrep rtmp_server) # 使用perf分析性能热点 perf record -g ./bin/rtmp_server perf report
总结
本RTMP推拉流项目实现了一个完整的流媒体服务器,主要特点包括:
- 完整的RTMP协议支持: 握手、命令处理、媒体数据传输
- 高性能网络处理: 基于muduo库的异步网络框架
- 音视频同步机制: 时间戳映射器确保播放连续性
- GOP缓存优化: 实现快速首屏和低延迟播放
- 多路流支持: 支持多个推流和拉流并发
- 完善的错误处理: 异常检测和恢复机制
通过深入理解RTMP协议和流媒体技术,可以进一步扩展功能,如:
- 支持更多编码格式(H.265、Opus等)
- 增加HLS/DASH输出
- 实现流录制功能
- 添加用户认证和权限控制
- 集成CDN分发