forked from ryokomy/python2-osc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathosc_server.py
More file actions
157 lines (121 loc) · 5.2 KB
/
osc_server.py
File metadata and controls
157 lines (121 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""OSC Servers that receive UDP packets and invoke handlers accordingly.
Use like this:
dispatcher = dispatcher.Dispatcher()
# This will print all parameters to stdout.
dispatcher.map("/bpm", print)
server = ForkingOSCUDPServer((ip, port), dispatcher)
server.serve_forever()
or run the server on its own thread:
server = ForkingOSCUDPServer((ip, port), dispatcher)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
...
server.shutdown()
Those servers are using the standard socketserver from the standard library:
http://docs.python.org/library/socketserver.html
"""
import calendar
try:
import socketserver
except ImportError:
import SocketServer as socketserver
import time
from pythonosc import osc_bundle
from pythonosc import osc_message
from pythonosc import osc_packet
class _UDPHandler(socketserver.BaseRequestHandler):
"""Handles correct UDP messages for all types of server.
Whether this will be run on its own thread, the server's or a whole new
process depends on the server you instanciated, look at their documentation.
This method is called after a basic sanity check was done on the datagram,
basically whether this datagram looks like an osc message or bundle,
if not the server won't even bother to call it and so no new
threads/processes will be spawned.
This method calls the handlers registered to the server's dispatcher for
every message it found in the packet.
The process/thread granularity is thus the OSC packet, not the handler.
If parameters were registered with the dispatcher, then the handlers are
called this way:
handler('/address that triggered the message',
registered_param_list, osc_msg_arg1, osc_msg_arg2, ...)
if no parameters were registered, then it is just called like this:
handler('/address that triggered the message',
osc_msg_arg1, osc_msg_arg2, osc_msg_param3, ...)
"""
def handle(self):
data = self.request[0]
# Get OSC messages from all bundles or standalone message.
try:
packet = osc_packet.OscPacket(data)
for timed_msg in packet.messages:
now = calendar.timegm(time.gmtime())
handlers = self.server.dispatcher.handlers_for_address(
timed_msg.message.address)
if not handlers:
continue
# If the message is to be handled later, then so be it.
if timed_msg.time > now:
time.sleep(timed_msg.time - now)
for handler in handlers:
if handler.args:
handler.callback(
timed_msg.message.address, handler.args, *timed_msg.message)
else:
handler.callback(timed_msg.message.address, *timed_msg.message)
except osc_packet.ParseError:
pass
def _is_valid_request(request):
"""Returns true if the request's data looks like an osc bundle or message."""
data = request[0]
return (
osc_bundle.OscBundle.dgram_is_bundle(data)
or osc_message.OscMessage.dgram_is_message(data))
class BlockingOSCUDPServer(socketserver.UDPServer, object):
"""Blocking version of the UDP server.
Each message will be handled sequentially on the same thread.
Use this is you don't care about latency in your message handling or don't
have a multiprocess/multithread environment (really?).
"""
def __init__(self, server_address, dispatcher):
super(BlockingOSCUDPServer, self).__init__(server_address, _UDPHandler)
self._dispatcher = dispatcher
def verify_request(self, request, client_address):
"""Returns true if the data looks like a valid OSC UDP datagram."""
return _is_valid_request(request)
@property
def dispatcher(self):
"""Dispatcher accessor for handlers to dispatch osc messages."""
return self._dispatcher
class ThreadingOSCUDPServer(
socketserver.ThreadingMixIn, socketserver.UDPServer, object):
"""Threading version of the OSC UDP server.
Each message will be handled in its own new thread.
Use this when lightweight operations are done by each message handlers.
"""
def __init__(self, server_address, dispatcher):
super(ThreadingOSCUDPServer, self).__init__(server_address, _UDPHandler)
self._dispatcher = dispatcher
def verify_request(self, request, client_address):
"""Returns true if the data looks like a valid OSC UDP datagram."""
return _is_valid_request(request)
@property
def dispatcher(self):
"""Dispatcher accessor for handlers to dispatch osc messages."""
return self._dispatcher
class ForkingOSCUDPServer(
socketserver.ForkingMixIn, socketserver.UDPServer, object):
"""Forking version of the OSC UDP server.
Each message will be handled in its own new process.
Use this when heavyweight operations are done by each message handlers
and forking a whole new process for each of them is worth it.
"""
def __init__(self, server_address, dispatcher):
super(ForkingOSCUDPServer, self).__init__(server_address, _UDPHandler)
self._dispatcher = dispatcher
def verify_request(self, request, client_address):
"""Returns true if the data looks like a valid OSC UDP datagram."""
return _is_valid_request(request)
@property
def dispatcher(self):
"""Dispatcher accessor for handlers to dispatch osc messages."""
return self._dispatcher