Soryi
Soryi

Reputation: 43

How to unit test on tornado?

I want to testing my tornado server

but, I don't know how...

I don't run my testing code...

please help me unit test on tornado

I don't speak English very well... I am sorry that I have poor questions.

this is my tornado server code.

import tornado.ioloop
import os
from tornado import web, gen
from concurrent.futures import ThreadPoolExecutor
from json_output import on_correct_text
import json

thread_pool = ThreadPoolExecutor()

class cer(tornado.web.RequestHandler):
    _thread_pool = thread_pool

    @gen.coroutine
    def post(self, json_data):
        ###  temporary json data
        _json = {'file_name' : 'tmp_text.txt', 'file_text' : 'temp text...'}
        json_data = json.dumps(_json)

        tmp_json = {'file_name': '', 'text': '', 'upload_start_timestamp':'', 'upload_end_timestamp': '','process_start_timestamp': '', 'process_end_timestamp': '', 'response_time_second': '', 'status': "upload_waiting"}

        json_d = json.loads(json_data)
        print(json_d)

        jsonString = yield on_correct_text(json_d, tmp_json)

        self.set_header("Content-Type", "application/json")
        self.write(jsonString)
        self.finish()


    def make_app():
        return tornado.web.Application([
            (r"/([a-zA-Z0-9.:_/%{}]+)", cer)
    ]
    )

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

and this is json_output.py

import datetime, time, json
from concurrent.futures import ThreadPoolExecutor
from tornado import gen

thread_pool = ThreadPoolExecutor()
_thread_pool = thread_pool

@gen.coroutine
def on_correct_text(json_data, tmp_json):
    file_up_starttime = datetime.datetime.now()
    tmp_json['upload_start_timestamp'] = str(file_up_starttime)
    tmp_json['status'] = "uploading"
    try:
        tmp_json['text'] = json_data['file_text']
        tmp_json['file_name'] = json_data['file_name']
    except:
        tmp_json['status'] = "upload_faild"
        print("upload falied")
        jsonString = json.dumps(tmp_json, ensure_ascii=False)
        return jsonString

    start_time = time.time()

    file_up_endtime = datetime.datetime.now()
    tmp_json['upload_end_timestamp'] = str(file_up_endtime)

    process_starttime = datetime.datetime.now()
    tmp_json['process_start_timestamp'] = str(process_starttime)
    tmp_json['status'] = "processing"
    try:
        process_endtime = datetime.datetime.now()
        tmp_json['process_end_timestamp'] = str(process_endtime)
        tmp_json['status'] = "complete"
    except:
        tmp_json['status'] = "process_faild"
        print("process falied")
        jsonString = json.dumps(tmp_json, ensure_ascii=False)
        return jsonString

    response_time = round((time.time() - start_time), 2)
    tmp_json['response_time_second'] = str(response_time)

    jsonString = json.dumps(tmp_json, ensure_ascii=False)
    return jsonString

please help me...

I don't know how this tornado server unit test

Upvotes: 2

Views: 4677

Answers (1)

Michael Robellard
Michael Robellard

Reputation: 2358

There is some excellent documentation here on how to setup Unit Testing in Tornado: http://www.tornadoweb.org/en/stable/testing.html

class cerTester(AsyncHTTPTestCase):
    def get_app(self):
        return make_app()

    def get_url(self, path):
        """Returns an absolute url for the given path on the test server."""
        return '%s://localhost:%s%s' % (self.get_protocol(),
                                        self.get_http_port(), path)

    def test_url(self):
        #This is where your actual test will take place. 
        response = self.fetch('/sometest', method="POST")
        assert response.code == 200

        data = json.loads(response.body.decode('utf-8'))
        assert data
        #More Asserts

Upvotes: 3

Related Questions