|
1 | 1 | import asyncio |
2 | | -import time |
3 | 2 | import datetime |
| 3 | +import json |
| 4 | +import re |
| 5 | +import time |
4 | 6 |
|
5 | 7 | import pytest |
6 | 8 | from pymysql import util |
@@ -215,3 +217,47 @@ def test_rollback(connection, cursor): |
215 | 217 |
|
216 | 218 | # should not return any rows since no inserts was commited |
217 | 219 | assert len(data) == 0 |
| 220 | + |
| 221 | + |
| 222 | +def mysql_server_is(server_version, version_tuple): |
| 223 | + """Return True if the given connection is on the version given or |
| 224 | + greater. |
| 225 | + e.g.:: |
| 226 | + if self.mysql_server_is(conn, (5, 6, 4)): |
| 227 | + # do something for MySQL 5.6.4 and above |
| 228 | + """ |
| 229 | + server_version_tuple = tuple( |
| 230 | + (int(dig) if dig is not None else 0) |
| 231 | + for dig in |
| 232 | + re.match(r'(\d+)\.(\d+)\.(\d+)', server_version).group(1, 2, 3) |
| 233 | + ) |
| 234 | + return server_version_tuple >= version_tuple |
| 235 | + |
| 236 | + |
| 237 | +@pytest.mark.run_loop |
| 238 | +def test_json(connection_creator, table_cleanup): |
| 239 | + connection = yield from connection_creator( |
| 240 | + charset="utf8mb4", autocommit=True) |
| 241 | + server_info = connection.get_server_info() |
| 242 | + if not mysql_server_is(server_info, (5, 7, 0)): |
| 243 | + raise pytest.skip("JSON type is not supported on MySQL <= 5.6") |
| 244 | + |
| 245 | + cursor = yield from connection.cursor() |
| 246 | + yield from cursor.execute("""\ |
| 247 | + CREATE TABLE test_json ( |
| 248 | + id INT NOT NULL, |
| 249 | + json JSON NOT NULL, |
| 250 | + PRIMARY KEY (id) |
| 251 | + );""") |
| 252 | + table_cleanup("test_json") |
| 253 | + json_str = '{"hello": "こんにちは"}' |
| 254 | + yield from cursor.execute( |
| 255 | + "INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,)) |
| 256 | + yield from cursor.execute("SELECT `json` from `test_json` WHERE `id`=42") |
| 257 | + |
| 258 | + r = yield from cursor.fetchone() |
| 259 | + assert json.loads(r[0]) == json.loads(json_str) |
| 260 | + |
| 261 | + yield from cursor.execute("SELECT CAST(%s AS JSON) AS x", (json_str,)) |
| 262 | + r = yield from cursor.fetchone() |
| 263 | + assert json.loads(r[0]) == json.loads(json_str) |
0 commit comments