-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevt.h
107 lines (96 loc) · 2.27 KB
/
evt.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include "all.h"
struct List {
function<void(void*)> self;
List* next;
};
struct Node{
string evt_name;
void* data;
};
unordered_map<string, List*> on_user; // 监听者
queue<Node> task; // 消息队列
void evt_on(string evt_name,function<void(void*)> cb){
List* li = (List*)malloc(sizeof(List));
li->self = cb;
li->next = NULL;
if (on_user[evt_name] == NULL) {
on_user[evt_name] = li;
} else {
li->next = on_user[evt_name];
on_user[evt_name]= li;
}
}
mutex look;
void evt_emit(string evt_name,void* data){
look.lock();
Node n1;
n1.evt_name = evt_name;
n1.data = data;
task.push(n1);
look.unlock();
}
atomic<bool> isExit = {true};
void evt_close(){
if (isExit) {
isExit = false;
cout<<"event loop close"<<endl;
}
}
atomic_long idx = ATOMIC_FLAG_INIT;
class Chan {
public:
long id=0;
int data=0;
aco_t* write=NULL;
aco_t* read=NULL;
Chan() {
id = ++idx;
}
};
Chan c;
queue<Chan*> msg;
void c_write(Chan& c, int data){
// 暂停当前协程,在队列里等待写入到
msg.push(&c);
c.data = data;
c.write = aco_get_co();
aco_yield();
}
int c_read(Chan& c) {
// 暂停当前协程,在队列里等待读取到
c.read = aco_get_co();
aco_yield();
// 开一个线程恢复 write
return c.data;
}
void loop(){
Node n1;
Chan* c1;
aco_t* ch_read;
aco_t* ch_write;
while(isExit) {
usleep(1000);
look.unlock();
while(!task.empty()){
n1 = task.front();
List* li = on_user[n1.evt_name];
while(li != NULL) {
li->self(n1.data);
li = li->next;
}
task.pop();
}
while(!msg.empty()){
c1 = msg.front();
ch_read = c1->read;
ch_write = c1->write;
if(ch_read != NULL && ch_write != NULL) {
// 在线程中恢复协程
evt_emit("golang_channel", ch_read);
evt_emit("golang_channel", ch_write);
msg.pop();
}
}
look.unlock();
}
}