forked from ryokomy/python2-osc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathosc_packet.py
More file actions
78 lines (62 loc) · 2.42 KB
/
osc_packet.py
File metadata and controls
78 lines (62 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Use OSC packets to parse incoming UDP packets into messages or bundles.
It lets you access easily to OscMessage and OscBundle instances in the packet.
"""
import calendar
import collections
import time
from pythonosc.parsing import osc_types
from pythonosc import osc_bundle
from pythonosc import osc_message
# A namedtuple as returned my the _timed_msg_of_bundle function.
# 1) the system time at which the message should be executed
# in seconds since the epoch.
# 2) the actual message.
TimedMessage = collections.namedtuple(
typename='TimedMessage',
field_names=('time', 'message'))
def _timed_msg_of_bundle(bundle, now):
"""Returns messages contained in nested bundles as a list of TimedMessage."""
msgs = []
for content in bundle:
if type(content) == osc_message.OscMessage:
if (bundle.timestamp == osc_types.IMMEDIATELY
or bundle.timestamp < now):
msgs.append(TimedMessage(now, content))
else:
msgs.append(TimedMessage(bundle.timestamp, content))
else:
msgs.extend(_timed_msg_of_bundle(content, now))
return msgs
class ParseError(Exception):
"""Base error thrown when a packet could not be parsed."""
class OscPacket(object):
"""Unit of transmission of the OSC protocol.
Any application that sends OSC Packets is an OSC Client.
Any application that receives OSC Packets is an OSC Server.
"""
def __init__(self, dgram):
"""Initialize an OdpPacket with the given UDP datagram.
Args:
- dgram: the raw UDP datagram holding the OSC packet.
Raises:
- ParseError if the datagram could not be parsed.
"""
now = calendar.timegm(time.gmtime())
try:
if osc_bundle.OscBundle.dgram_is_bundle(dgram):
self._messages = sorted(
_timed_msg_of_bundle(osc_bundle.OscBundle(dgram), now),
key=lambda x: x.time)
elif osc_message.OscMessage.dgram_is_message(dgram):
self._messages = (TimedMessage(now, osc_message.OscMessage(dgram)),)
else:
# Empty packet, should not happen as per the spec but heh, UDP...
raise ParseError(
'OSC Packet should at least contain an OscMessage or an '
'OscBundle.')
except (osc_bundle.ParseError, osc_message.ParseError) as pe:
raise ParseError('Could not parse packet %s' % pe)
@property
def messages(self):
"""Returns asc-time-sorted TimedMessages of the messages in this packet."""
return self._messages