-
Notifications
You must be signed in to change notification settings - Fork 88
/
Copy pathbinlogstream.py.diff
114 lines (107 loc) · 4.65 KB
/
binlogstream.py.diff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
--- ./binlogstream.py 2016-05-20 16:29:08.000000000 +0800
+++ ../../pymysqlreplication/binlogstream.py 2016-11-08 17:51:42.000000000 +0800
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-
+import pdb
import pymysql
import struct
@@ -125,7 +125,7 @@
"""Connect to replication stream and read event
"""
report_slave = None
-
+ #changed by yilai,change parameter
def __init__(self, connection_settings, server_id, resume_stream=False,
blocking=False, only_events=None, log_file=None, log_pos=None,
filter_non_implemented_events=True,
@@ -133,7 +133,7 @@
only_tables=None, only_schemas=None,
freeze_schema=False, skip_to_timestamp=None,
report_slave=None, slave_uuid=None,
- pymysql_wrapper=None):
+ pymysql_wrapper=None,end_to_timestamp=None,process_interval=0):
"""
Attributes:
resume_stream: Start for event from position or the latest event of
@@ -188,6 +188,11 @@
self.pymysql_wrapper = pymysql_wrapper
else:
self.pymysql_wrapper = pymysql.connect
+ #added by yilai
+ #self.current_position=-1
+ self.end_to_timestamp=end_to_timestamp
+ self.event_count=0
+ self.process_interval=process_interval
def close(self):
if self.__connected_stream:
@@ -346,7 +351,7 @@
prelude += struct.pack('<I', gtid_set.encoded_length)
# encoded_data
prelude += gtid_set.encoded()
-
+ #pdb.set_trace()
if pymysql.__version__ < "0.6":
self._stream_connection.wfile.write(prelude)
self._stream_connection.wfile.flush()
@@ -357,6 +362,7 @@
def fetchone(self):
while True:
+
if not self.__connected_stream:
self.__connect_to_stream()
@@ -378,10 +384,18 @@
if pkt.is_eof_packet():
self.close()
return None
+ "add by yilai"
+ self.event_count += 1
+ if self.process_interval>0:
+ if (self.event_count % self.process_interval)==0:
+ print("scan {0} events ....".format(self.event_count))
if not pkt.is_ok_packet():
continue
-
+ #yilai,for debug
+ #print("-----begin event-----")
+ #print(':'.join(x.encode('hex') for x in pkt._data))
+ #print(pkt._data)
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
self._ctl_connection,
self.__use_checksum,
@@ -389,6 +403,14 @@
self.__only_tables,
self.__only_schemas,
self.__freeze_schema)
+ #added by yilai
+ #print("*****timestamp{0},next pos{1},current pos{2}".format(binlog_event.timestamp,
+ # binlog_event.log_pos,self.current_position))
+ #self.current_position=binlog_event.log_pos
+ #pdb.set_trace()
+ #added by yilai
+ if self.end_to_timestamp and binlog_event.timestamp > self.end_to_timestamp:
+ return None
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
continue
@@ -401,6 +423,7 @@
if binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
self.log_file = binlog_event.event.next_binlog
+ #binlog_event.event.dump()
# Table Id in binlog are NOT persistent in MySQL - they are in-memory identifiers
# that means that when MySQL master restarts, it will reuse same table id for different tables
# which will cause errors for us since our in-memory map will try to decode row data with
@@ -457,12 +480,13 @@
self.__connect_to_ctl()
cur = self._ctl_connection.cursor()
+ #changed by yilai,add schema name "information_schema" for rds in alibaba
cur.execute("""
SELECT
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY
FROM
- columns
+ information_schema.columns
WHERE
table_schema = %s AND table_name = %s
""", (schema, table))