Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion splitio/push/splitsse.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ def connect(url):
try:
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
finally:
self._sse_connection_closed.set()
self._status = SplitSSEClient._Status.IDLE
self._sse_connection_closed.set()

url = self._build_url(token)
task = threading.Thread(target=connect, name='SSEConnection', args=(url,))
Expand Down
2 changes: 1 addition & 1 deletion splitio/push/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def build(self):
class SSEClient(object):
"""SSE Client implementation."""

_DEFAULT_HEADERS = {'Accept': 'text/event-stream'}
_DEFAULT_HEADERS = {'accept': 'text/event-stream'}
_EVENT_SEPARATORS = set([b'\n', b'\r\n'])

def __init__(self, callback):
Expand Down
File renamed without changes.
230 changes: 230 additions & 0 deletions tests/helpers/mockserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""SSE mock server."""
import json
from collections import namedtuple
import queue
import threading

from http.server import HTTPServer, BaseHTTPRequestHandler


Request = namedtuple('Request', ['method', 'path', 'headers', 'body'])


class SSEMockServer(object):
"""SSE server for testing purposes."""

protocol_version = 'HTTP/1.1'

GRACEFUL_REQUEST_END = 'REQ-END'
VIOLENT_REQUEST_END = 'REQ-KILL'

def __init__(self, req_queue=None):
"""Consruct a mock server."""
self._queue = queue.Queue()
self._server = HTTPServer(('localhost', 0),
lambda *xs: SSEHandler(self._queue, *xs, req_queue=req_queue))
self._server_thread = threading.Thread(target=self._blocking_run)
self._server_thread.setDaemon(True)
self._done_event = threading.Event()

def _blocking_run(self):
"""Execute."""
self._server.serve_forever()
self._done_event.set()

def port(self):
"""Return the assigned port."""
return self._server.server_port

def publish(self, event):
"""Publish an event."""
self._queue.put(event, block=False)

def start(self):
"""Start the server asyncrhonously."""
self._server_thread.start()

def wait(self, timeout=None):
"""Wait for the server to shutdown."""
return self._done_event.wait(timeout)

def stop(self):
"""Stop the server."""
self._server.shutdown()


class SSEHandler(BaseHTTPRequestHandler):
"""Handler."""

def __init__(self, event_queue, *args, **kwargs):
"""Construct a handler."""
self._queue = event_queue
self._req_queue = kwargs.get('req_queue')
BaseHTTPRequestHandler.__init__(self, *args)

def do_GET(self): #pylint:disable=invalid-name
"""Respond to a GET request."""
self.send_response(200)
self.send_header("Content-type", "text/event-stream")
self.send_header("Transfer-Encoding", "chunked")
self.send_header("Connection", "keep-alive")
self.end_headers()

if self._req_queue is not None:
headers = dict(zip(self.headers.keys(), self.headers.values()))
self._req_queue.put(Request('GET', self.path, headers, None))

def write_chunk(chunk):
"""Write an event/chunk."""
tosend = '%X\r\n%s\r\n'%(len(chunk), chunk)
self.wfile.write(tosend.encode('utf-8'))

while True:
event = self._queue.get()
if event == SSEMockServer.GRACEFUL_REQUEST_END:
break
elif event == SSEMockServer.VIOLENT_REQUEST_END:
raise Exception('exploding')

chunk = ''
chunk += 'id: % s\n' % event['id'] if 'id' in event else ''
chunk += 'event: % s\n' % event['event'] if 'event' in event else ''
chunk += 'retry: % s\n' % event['retry'] if 'retry' in event else ''
chunk += 'data: % s\n' % event['data'] if 'data' in event else ''
if chunk != '':
write_chunk(chunk + '\r\n')

self.wfile.write('0\r\n\r\n'.encode('utf-8'))


class SplitMockServer(object):
"""SDK server mock for testing purposes."""

protocol_version = 'HTTP/1.1'

def __init__(self, split_changes=None, segment_changes=None, req_queue=None):
"""
Consruct a mock server.

:param changes: mapping of changeNumbers to splitChanges responses
:type changes: dict
"""
split_changes = split_changes if split_changes is not None else {}
segment_changes = segment_changes if segment_changes is not None else {}
self._server = HTTPServer(('localhost', 0),
lambda *xs: SDKHandler(split_changes, segment_changes, *xs,
req_queue=req_queue)) # pylint:disable=line-too-long
self._server_thread = threading.Thread(target=self._blocking_run, name="SplitMockServer")
self._server_thread.setDaemon(True)
self._done_event = threading.Event()

def _blocking_run(self):
"""Execute."""
self._server.serve_forever()
self._done_event.set()

def port(self):
"""Return the assigned port."""
return self._server.server_port

def start(self):
"""Start the server asyncrhonously."""
self._server_thread.start()

def wait(self, timeout=None):
"""Wait for the server to shutdown."""
return self._done_event.wait(timeout)

def stop(self):
"""Stop the server."""
self._server.shutdown()


class SDKHandler(BaseHTTPRequestHandler):
"""Handler."""

def __init__(self, split_changes, segment_changes, *args, **kwargs):
"""Construct a handler."""
self._req_queue = kwargs.get('req_queue')
self._split_changes = split_changes
self._segment_changes = segment_changes
BaseHTTPRequestHandler.__init__(self, *args)

def _parse_qs(self):
raw_query = self.path.split('?')[1] if '?' in self.path else ''
return dict([item.split('=') for item in raw_query.split('&')])

def _handle_segment_changes(self):
qstring = self._parse_qs()
since = int(qstring.get('since', -1))
name = qstring.get('name')
if name is None:
self.send_response(400)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write('{}'.encode('utf-8'))
return

to_send = self._segment_changes.get((name, since,))
if to_send is None:
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write('{}'.encode('utf-8'))
return

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(to_send).encode('utf-8'))

def _handle_split_changes(self):
qstring = self._parse_qs()
since = int(qstring.get('since', -1))
to_send = self._split_changes.get(since)
if to_send is None:
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write('{}'.encode('utf-8'))
return

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(to_send).encode('utf-8'))

def do_GET(self): #pylint:disable=invalid-name
"""Respond to a GET request."""
if self._req_queue is not None:
headers = dict(zip(self.headers.keys(), self.headers.values()))
self._req_queue.put(Request('GET', self.path, headers, None))

if self.path.startswith('/api/splitChanges'):
self._handle_split_changes()
elif self.path.startswith('/api/segmentChanges'):
self._handle_segment_changes()
else:
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()

def do_POST(self): #pylint:disable=invalid-name
"""Respond to a GET request."""
if self._req_queue is not None:
length = int(self.headers.getheader('content-length'))
body = self.rfile.read(length) if length else None
headers = dict(zip(self.headers.keys(), self.headers.values()))
self._req_queue.put(Request('GET', self.path, headers, body))

if self.path in set(['/api/testImpressions/bulk', '/testImpressions/count',
'/api/events/bulk', '/metrics/times', '/metrics/count',
'/metrics/gauge']):

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
else:
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
Empty file added tests/integration/__init__.py
Empty file.
91 changes: 0 additions & 91 deletions tests/push/mockserver.py

This file was deleted.

12 changes: 7 additions & 5 deletions tests/push/test_splitsse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from splitio.models.token import Token
from splitio.push.splitsse import SplitSSEClient
from splitio.push.sse import SSEEvent

from .mockserver import SSEMockServer
from tests.helpers.mockserver import SSEMockServer


class SSEClientTests(object):
Expand Down Expand Up @@ -41,7 +40,9 @@ def handler(event):
time.sleep(1)
client.stop()

assert request_queue.get() == '/event-stream?v=1.1&accessToken=some&channels=chan1,[?occupancy=metrics.publishers]chan2'
request = request_queue.get(1)
assert request.path == '/event-stream?v=1.1&accessToken=some&channels=chan1,[?occupancy=metrics.publishers]chan2'
assert request.headers['accept'] == 'text/event-stream'

assert events == [
SSEEvent('1', 'message', '1', 'a'),
Expand Down Expand Up @@ -74,8 +75,9 @@ def handler(event):
with pytest.raises(Exception):
client.stop()

assert request_queue.get() == ('/event-stream?v=1.1&accessToken=some'
'&channels=chan1,[?occupancy=metrics.publishers]chan2')
request = request_queue.get(1)
assert request.path == '/event-stream?v=1.1&accessToken=some&channels=chan1,[?occupancy=metrics.publishers]chan2'
assert request.headers['accept'] == 'text/event-stream'

server.publish(SSEMockServer.VIOLENT_REQUEST_END)
server.stop()
2 changes: 1 addition & 1 deletion tests/push/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import threading
import pytest
from splitio.push.sse import SSEClient, SSEEvent
from .mockserver import SSEMockServer
from tests.helpers.mockserver import SSEMockServer


class SSEClientTests(object):
Expand Down