博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
libuv实现tcp代理服务器
阅读量:2135 次
发布时间:2019-04-30

本文共 8795 字,大约阅读时间需要 29 分钟。

目录

概述

为了学习libuv的tcp相关函数使用,实现了一个tcp代理服务。

1 启动TCP服务

2 如果有新的连接X,则向代理地址建立连接Y,并关联X和Y成组
3 如果X收到数据包,发往Y;反之如果Y收到数据包发往X
4 如果X或者Y连接异常,关闭此组连接
5 代理服务退出时,清理所有组,清理TCP服务

API简介

uv_err_name

出现错误可以通过uv_strerror和uv_err_name找错误原因。

int r = uv_accept(server, stream);fprintf(stderr, "uv_accept error %s\n", uv_err_name(r));

uv_ip4_addr

struct sockaddr_in m_local;struct sockaddr_in m_remote;...uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &m_local);

将给定的ip地址和端口转换成sockaddr_in结构体,原生编程的时候,设置ip和端口需要至少五行,用这个方法可以简化操作

uv_tcp_init

初始化 tcp 对象

uv_loop_t* m_loop;...m_loop = uv_default_loop();...uv_tcp_t m_tcpServer;...uv_tcp_init(m_loop, &m_tcpServer);//初始化tcp server对象

uv_tcp_bind

等同于原生API的 bind() 方法

uv_tcp_bind(&m_tcpServer, (const struct sockaddr*) &m_local, 0);

uv_tcp_bind() 的第三个参数 flag 一般是0,如果想使用IP6,可以使用 UV_TCP_IPV6ONLY

enum uv_tcp_flags {
/* Used with uv_tcp_bind, when an IPv6 address is used. */ UV_TCP_IPV6ONLY = 1};

uv_listen

与listen作用类似,增加了回调函数,有新连接来时会触发回调。

uv_listen((uv_stream_t*)&m_tcpServer, 50, OnAccept);

uv_accept

在OnAccept回调中执行uv_accept,用socket与stream关联,stream需要先分配内存和初始化。

int r;uv_stream_t* stream;...uv_tcp_init(pProxy->m_loop, (uv_tcp_t*) stream);...r = uv_accept(server, stream);uv_err_name(r)

源码演示

///*在入口函数中包含 _CrtDumpMemoryLeaks();  即可检测到内存泄露*/#define CRTDBG_MAP_ALLOC  #include 
#include
//#include
//for kbhit()#include "stdafx.h"#include "uv.h"#include
#include
using namespace std;#define container_of(ptr, type, member) \ ((type *) ((char *) (ptr) - offsetof(type, member)))typedef void* (*proc_func)(char* buff, size_t size);class CProxy{
public: CProxy(const char* sip, int sport, const char* dip, int dport, uv_loop_t* loop = NULL) {
uv_ip4_addr(sip, sport, &m_local); uv_ip4_addr(dip, dport, &m_remote); if (NULL == loop) {
m_loop = uv_default_loop(); } else {
m_loop = loop; } } int Start() {
int r = 0; r = uv_tcp_init(m_loop, &m_tcpServer); if (r) {
return r; } r = uv_tcp_bind(&m_tcpServer, (const struct sockaddr*) &m_local, 0); if (r) {
uv_err_name(r); return r; } r = uv_listen((uv_stream_t*)&m_tcpServer, 50, OnAccept); if (r) {
return r; } m_tcpServer.data = this; return 0; } void ShowAllPair() {
map
::iterator iter = m_mapStream.begin(); for (; iter != m_mapStream.end(); ++iter) { char ip[3][64]; int port[3]; GetAddrFromStream(iter->first, ip[0], port[0]); GetAddrFromStream(iter->second, ip[1], port[1]); GetAddrFromStream(iter->second, ip[2], port[2], 0); printf("proxy %s:%d <--> %s:%d <--> %s:%d connected!\n", ip[0], port[0], ip[2], port[2], ip[1], port[1]); } } void Stop() { map
::iterator iter = m_mapStream.begin(); for (; iter != m_mapStream.end(); ++iter) { uv_read_stop(iter->first); uv_read_stop(iter->second); DeletePairStream(iter->first); } StopServer(); m_mapStream.clear(); }private: struct context { CProxy* pProxy; uv_tcp_t tcpStream; }; void StopServer() { uv_close((uv_handle_t*) &m_tcpServer, NULL); } static char* GetAddrFromStream(uv_stream_t* stream, char* sip, int& port, int flag = 1) { char ip[64] = { 0}; struct sockaddr_in addrFrom; int len = sizeof(struct sockaddr_in); if(flag == 1) { uv_tcp_getpeername((uv_tcp_t*)stream, (struct sockaddr*)&addrFrom, &len); } else { uv_tcp_getsockname((uv_tcp_t*)stream, (struct sockaddr*)&addrFrom, &len); } uv_ip4_name((const struct sockaddr_in*)&addrFrom, ip, sizeof(ip)); if (sip) { strcpy(sip, ip); } port = ntohs(addrFrom.sin_port); return sip; } static uv_stream_t* FindPairStream(uv_stream_t* stream) { return (uv_stream_t*)stream->data; } static void AddPairStream(uv_stream_t* stream, uv_stream_t* stream2) { struct context* ctx = container_of(stream, struct context, tcpStream); CProxy* pProxy = ctx->pProxy; pProxy->m_mapStream.insert( pair< uv_stream_t*, uv_stream_t* > (stream, stream2) ); char ip[3][64]; int port[3]; GetAddrFromStream(stream, ip[0], port[0]); GetAddrFromStream(stream2, ip[1], port[1]); GetAddrFromStream(stream2, ip[2], port[2], 0); printf("proxy %s:%d <--> %s:%d <--> %s:%d connected!\n", ip[0], port[0], ip[2], port[2], ip[1], port[1]); } static void RemovePairStream(uv_stream_t* stream) { char ip[3][64]; int port[3]; uv_stream_t* stream2 = (uv_stream_t*)stream->data; struct context* ctx = container_of(stream, struct context, tcpStream); CProxy* pProxy = ctx->pProxy; map
::iterator iter = pProxy->m_mapStream.find(stream); if (iter != pProxy->m_mapStream.end()) { GetAddrFromStream(stream, ip[0], port[0]); GetAddrFromStream(stream2, ip[1], port[1]); GetAddrFromStream(stream2, ip[2], port[2], 0); pProxy->m_mapStream.erase(iter); } iter = pProxy->m_mapStream.find(stream2); if (iter != pProxy->m_mapStream.end()) { GetAddrFromStream(stream2, ip[0], port[0]); GetAddrFromStream(stream, ip[1], port[1]); GetAddrFromStream(stream, ip[2], port[2], 0); pProxy->m_mapStream.erase(iter); } printf("proxy %s:%d <--> %s:%d <--> %s:%d closed!\n", ip[0], port[0], ip[2], port[2], ip[1], port[1]); } static void OnAccept(uv_stream_t* server, int status) { CProxy* pProxy = (CProxy*)server->data; uv_stream_t* stream; int r; if (status != 0) { fprintf(stderr, "Connect error %s\n", uv_err_name(status)); return; } struct context* ctx = new struct context; if (ctx == NULL) { return; } ctx->pProxy = pProxy; stream = (uv_stream_t*)&ctx->tcpStream; r = uv_tcp_init(pProxy->m_loop, (uv_tcp_t*) stream); if (r) { return; } r = uv_accept(server, stream); if (r) { return; } uv_connect_t* con = new uv_connect_t; if (con == NULL) { return; } con->data = server->data; struct context* ctx2 = new struct context; if (ctx2 == NULL) { return; } ctx2->pProxy = pProxy; uv_stream_t* streamTo = (uv_stream_t*)&ctx2->tcpStream; streamTo->data = stream; r = uv_tcp_init(pProxy->m_loop, (uv_tcp_t*) streamTo); if (r) { return; } uv_tcp_connect(con, (uv_tcp_t*)streamTo, (const struct sockaddr*) &pProxy->m_remote, OnConnect); } static void FreePointer(void** p) { delete (*p); *p = NULL; } static void FreePointer(char** p) { if (p && *p) { delete [] (*p); *p = NULL; } } static void OnCloseAndClear(uv_handle_t* peer) { printf("OnCloseAndClear %p\n", peer); uv_stream_t* stream = (uv_stream_t*)peer; struct context* ctx = container_of(stream, struct context, tcpStream); FreePointer((void**)&ctx); } static void DeletePairStream(uv_stream_t* stream) { uv_stream_t* stream2 = (uv_stream_t*)stream->data; uv_close((uv_handle_t*) stream, OnCloseAndClear); uv_close((uv_handle_t*) stream2, OnCloseAndClear); } static void OnConnect(uv_connect_t* req, int status) { CProxy* pProxy = (CProxy*)req->data; uv_stream_t* streamTo = req->handle; uv_stream_t* stream = (uv_stream_t*)streamTo->data; stream->data = streamTo; if (status < 0) { RemovePairStream(stream); DeletePairStream(stream); FreePointer((void**)&req); return; } AddPairStream(stream, streamTo); /*建立一组连接,使能接收数据,并设置内存申请,读数据的回调*/ uv_read_start(stream, Balloc, OnRead); uv_read_start(streamTo, Balloc, OnRead); FreePointer((void**)&req); } static void Balloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = new char[suggested_size]; buf->len = suggested_size; } static void OnRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { uv_stream_t* streamTo = handle; uv_stream_t* stream = FindPairStream(streamTo); if (nread < 0) { FreePointer((char**)&buf->base); RemovePairStream(stream); DeletePairStream(stream); return; } if (nread == 0) { FreePointer((char**)&buf->base); return; } uv_write_t *pw = new uv_write_t; if (pw == NULL) { FreePointer((char**)&buf->base); return; } uv_buf_t* bufw = new uv_buf_t; if (bufw == NULL) { FreePointer((char**)&buf->base); return; } bufw->base = buf->base; bufw->len = nread; pw->data = (void*)bufw; uv_write(pw, stream, bufw, 1, OnWrite); } static void OnWrite(uv_write_t* req, int status) { uv_buf_t* buf = (uv_buf_t*)req->data; FreePointer((char**)&buf->base); FreePointer((void**)&buf); FreePointer((void**)&req); if (status == 0) return; printf("uv_write error: %s - %s\n", uv_err_name(status), uv_strerror(status)); }private: struct sockaddr_in m_local; struct sockaddr_in m_remote; uv_tcp_t m_tcpServer; uv_loop_t* m_loop; map
m_mapStream;};int _tmain(int argc, _TCHAR* argv[]){ uv_loop_t* loop = uv_default_loop(); CProxy *p = new CProxy("0.0.0.0", 50000, "192.168.10.31", 60000, loop); p->Start(); while(!_kbhit()) { uv_run(loop, UV_RUN_NOWAIT); } p->Stop(); /*给libuv库清理相关的handles等*/ while(uv_loop_alive(loop) != 0) { uv_run(loop, UV_RUN_NOWAIT); } printf("uv_loop_close before\n"); uv_loop_close(loop); uv_library_shutdown(); delete p; _CrtDumpMemoryLeaks(); while (!_kbhit()); return 0;}

转载地址:http://omkgf.baihongyu.com/

你可能感兴趣的文章
flask_migrate
查看>>
解决activemq多消费者并发处理
查看>>
UDP连接和TCP连接的异同
查看>>
hibernate 时间段查询
查看>>
java操作cookie 实现两周内自动登录
查看>>
Tomcat 7优化前及优化后的性能对比
查看>>
Java Guava中的函数式编程讲解
查看>>
Eclipse Memory Analyzer 使用技巧
查看>>
tomcat连接超时
查看>>
谈谈编程思想
查看>>
iOS MapKit导航及地理转码辅助类
查看>>
检测iOS的网络可用性并打开网络设置
查看>>
简单封装FMDB操作sqlite的模板
查看>>
iOS开发中Instruments的用法
查看>>
iOS常用宏定义
查看>>
什么是ActiveRecord
查看>>
有道词典for mac在Mac OS X 10.9不能取词
查看>>
关于“团队建设”的反思
查看>>
利用jekyll在github中搭建博客
查看>>
Windows7中IIS简单安装与配置(详细图解)
查看>>