Reputation: 43
I want to testing my tornado server
but, I don't know how...
I don't run my testing code...
please help me unit test on tornado
I don't speak English very well... I am sorry that I have poor questions.
this is my tornado server code.
import tornado.ioloop
import os
from tornado import web, gen
from concurrent.futures import ThreadPoolExecutor
from json_output import on_correct_text
import json
thread_pool = ThreadPoolExecutor()
class cer(tornado.web.RequestHandler):
_thread_pool = thread_pool
@gen.coroutine
def post(self, json_data):
### temporary json data
_json = {'file_name' : 'tmp_text.txt', 'file_text' : 'temp text...'}
json_data = json.dumps(_json)
tmp_json = {'file_name': '', 'text': '', 'upload_start_timestamp':'', 'upload_end_timestamp': '','process_start_timestamp': '', 'process_end_timestamp': '', 'response_time_second': '', 'status': "upload_waiting"}
json_d = json.loads(json_data)
print(json_d)
jsonString = yield on_correct_text(json_d, tmp_json)
self.set_header("Content-Type", "application/json")
self.write(jsonString)
self.finish()
def make_app():
return tornado.web.Application([
(r"/([a-zA-Z0-9.:_/%{}]+)", cer)
]
)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.instance().start()
and this is json_output.py
import datetime, time, json
from concurrent.futures import ThreadPoolExecutor
from tornado import gen
thread_pool = ThreadPoolExecutor()
_thread_pool = thread_pool
@gen.coroutine
def on_correct_text(json_data, tmp_json):
file_up_starttime = datetime.datetime.now()
tmp_json['upload_start_timestamp'] = str(file_up_starttime)
tmp_json['status'] = "uploading"
try:
tmp_json['text'] = json_data['file_text']
tmp_json['file_name'] = json_data['file_name']
except:
tmp_json['status'] = "upload_faild"
print("upload falied")
jsonString = json.dumps(tmp_json, ensure_ascii=False)
return jsonString
start_time = time.time()
file_up_endtime = datetime.datetime.now()
tmp_json['upload_end_timestamp'] = str(file_up_endtime)
process_starttime = datetime.datetime.now()
tmp_json['process_start_timestamp'] = str(process_starttime)
tmp_json['status'] = "processing"
try:
process_endtime = datetime.datetime.now()
tmp_json['process_end_timestamp'] = str(process_endtime)
tmp_json['status'] = "complete"
except:
tmp_json['status'] = "process_faild"
print("process falied")
jsonString = json.dumps(tmp_json, ensure_ascii=False)
return jsonString
response_time = round((time.time() - start_time), 2)
tmp_json['response_time_second'] = str(response_time)
jsonString = json.dumps(tmp_json, ensure_ascii=False)
return jsonString
please help me...
I don't know how this tornado server unit test
Upvotes: 2
Views: 4677
Reputation: 2358
There is some excellent documentation here on how to setup Unit Testing in Tornado: http://www.tornadoweb.org/en/stable/testing.html
class cerTester(AsyncHTTPTestCase):
def get_app(self):
return make_app()
def get_url(self, path):
"""Returns an absolute url for the given path on the test server."""
return '%s://localhost:%s%s' % (self.get_protocol(),
self.get_http_port(), path)
def test_url(self):
#This is where your actual test will take place.
response = self.fetch('/sometest', method="POST")
assert response.code == 200
data = json.loads(response.body.decode('utf-8'))
assert data
#More Asserts
Upvotes: 3