Adding the program logic
Now that all the elements needed to implement the asynchronous reactive server are implemented, the application logic can be added. This part is implemented as a function that takes an object of observables as input and returns an object of observables as output. It is a component.
The following figure shows the reactivity diagram of this component:
The empty skeleton of this function is the following one:
def echo_server(source):
return {
'http': Observable.empty()
}
This component is implemented as a pure function. Its behavior depends only on the value of the input parameter; that is, the content of the http observable contained in the source object. For now, this function returns an empty observable. This will be completed once the body of the function is completed.
The first part of this component is the configuration and initialization of the HTTP server. This is done in a dedicated observable that contains the definition of the HTTP route and the request to start the server:
init = Observable.from_([
{
'what': 'add_route',
'methods': ['GET'],
'path': '/echo/{what}',
}, {
'what': 'start_server',
'host': 'localhost',
'port': 8080
}
])
The first item will add a new route for the GET method on the /echo/ path. Note the usage of a variable resource, the {what} part of the path, to retrieve the text to echo. The second items effectively start the server on localhost and port 8080.
The second part of the component consists of answering the echo requests. The echo requests come from the observable present in the http field of the source dictionary. The answer is simply built by mapping the source request item to a response item:
echo = source['http'] \
.map(lambda i: {
'what': 'response',
'status': 200,
'context': i['context'],
'data': i['match_info']['what'].encode('utf-8'),
})
The implementation is straightforward and consists of lambda. The returned item is a response item, with status 200 (the status code for OK in an HTTP response), and the data retrieved from the variable resource that was declared in the route. This value is retrieved from the match_info field of the request item. This text value must be encoded so that aiohttp can put it in the body of the response. The response items are available in the echo observable.
Now that all the logic is implemented, these two observables must be returned so that they can feed the HTTP driver. This is done with the merge operator:
return {
'http': Observable.merge(init, echo),
}
The full code of the echo component is the following one:
def echo_server(source):
init = Observable.from_([
{
'what': 'add_route',
'methods': ['GET'],
'path': '/echo/{what}',
}, {
'what': 'start_server',
'host': 'localhost',
'port': 8080
}
])
echo = source['http'] \
.map(lambda i: {
'what': 'response',
'status': 200,
'context': i['context'],
'data': i['match_info']['what'].encode('utf-8'),
})
return {
'http': Observable.merge(init, echo),
}
The full code of the server is available in the rx_http_echo.py script. It can be tested the same way as with the previous implementation of asyncio. Start the server in a terminal, and then in another Terminal, use curl to test it:
$ curl http://localhost:8080/echo/hello
hello
$ curl http://localhost:8080/echo/foo
foo
This implementation does not stop the server. As an exercise, you can add another route such as /exit that will ask the HTTP driver to stop the server.