-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathpbuffer.py
More file actions
182 lines (155 loc) · 3.74 KB
/
pbuffer.py
File metadata and controls
182 lines (155 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
##
# .protocol.pbuffer
##
"""
Pure Python message buffer implementation.
Given data read from the wire, buffer the data until a complete message has been
received.
"""
__all__ = ['pq_message_stream']
from io import BytesIO
import struct
from .message_types import message_types
xl_unpack = struct.Struct('!xL').unpack_from
class pq_message_stream(object):
"""
Provide a message stream from a data stream.
"""
_block = 512
_limit = _block * 4
def __init__(self):
self._strio = BytesIO()
self._start = 0
def truncate(self):
"""
Remove all data in the buffer.
"""
self._strio.truncate(0)
self._start = 0
def _rtruncate(self, amt = None):
"""
[internal] remove the given amount of data.
"""
strio = self._strio
if amt is None:
amt = self._strio.tell()
strio.seek(0, 2)
size = strio.tell()
# if the total size is equal to the amt,
# then the whole thing is going to be truncated.
if size == amt:
strio.truncate(0)
return
copyto_pos = 0
copyfrom_pos = amt
while True:
strio.seek(copyfrom_pos)
data = strio.read(self._block)
# Next copyfrom
copyfrom_pos = strio.tell()
strio.seek(copyto_pos)
strio.write(data)
if len(data) != self._block:
break
# Next copyto
copyto_pos = strio.tell()
strio.truncate(size - amt)
def has_message(self, xl_unpack = xl_unpack, len = len):
"""
Whether the buffer has a message available.
"""
strio = self._strio
strio.seek(self._start)
header = strio.read(5)
if len(header) < 5:
return False
length, = xl_unpack(header)
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
strio.seek(0, 2)
return (strio.tell() - self._start) >= length + 1
def __len__(self, xl_unpack = xl_unpack, len = len):
"""
Number of messages in buffer.
"""
count = 0
rpos = self._start
strio = self._strio
strio.seek(self._start)
while True:
# get the message metadata
header = strio.read(5)
rpos += 5
if len(header) < 5:
# not enough data for another message
break
# unpack the length from the header
length, = xl_unpack(header)
rpos += length - 4
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
strio.seek(length - 4 - 1, 1)
if len(strio.read(1)) != 1:
break
count += 1
return count
def _get_message(self,
mtypes = message_types,
len = len,
xl_unpack = xl_unpack,
):
strio = self._strio
header = strio.read(5)
if len(header) < 5:
return
length, = xl_unpack(header)
typ = mtypes[header[0]]
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
length -= 4
body = strio.read(length)
if len(body) < length:
# Not enough data for message.
return
return (typ, body)
def next_message(self):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
self._strio.seek(self._start)
msg = self._get_message()
if msg is not None:
self._start = self._strio.tell()
return msg
def __next__(self):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
self._strio.seek(self._start)
msg = self._get_message()
if msg is None:
raise StopIteration
self._start = self._strio.tell()
return msg
def read(self, num = 0xFFFFFFFF, len = len):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
new_start = self._start
self._strio.seek(new_start)
l = []
while len(l) < num:
msg = self._get_message()
if msg is None:
break
l.append(msg)
new_start += (5 + len(msg[1]))
self._start = new_start
return l
def write(self, data):
# Always append data; it's a stream, damnit..
self._strio.seek(0, 2)
self._strio.write(data)
def getvalue(self):
self._strio.seek(self._start)
return self._strio.read()