You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
11KB

  1. /*
  2. * Event loop thread
  3. *
  4. * Copyright Red Hat Inc., 2013
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. *
  12. */
  13. #include "qemu/osdep.h"
  14. #include "qom/object.h"
  15. #include "qom/object_interfaces.h"
  16. #include "qemu/module.h"
  17. #include "block/aio.h"
  18. #include "block/block.h"
  19. #include "sysemu/iothread.h"
  20. #include "qapi/error.h"
  21. #include "qapi/qapi-commands-misc.h"
  22. #include "qemu/error-report.h"
  23. #include "qemu/rcu.h"
  24. #include "qemu/main-loop.h"
  25. typedef ObjectClass IOThreadClass;
  26. #define IOTHREAD_GET_CLASS(obj) \
  27. OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
  28. #define IOTHREAD_CLASS(klass) \
  29. OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
  30. #ifdef CONFIG_POSIX
  31. /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
  32. * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
  33. * workloads.
  34. */
  35. #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
  36. #else
  37. #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
  38. #endif
  39. static __thread IOThread *my_iothread;
  40. AioContext *qemu_get_current_aio_context(void)
  41. {
  42. return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
  43. }
  44. static void *iothread_run(void *opaque)
  45. {
  46. IOThread *iothread = opaque;
  47. rcu_register_thread();
  48. /*
  49. * g_main_context_push_thread_default() must be called before anything
  50. * in this new thread uses glib.
  51. */
  52. g_main_context_push_thread_default(iothread->worker_context);
  53. my_iothread = iothread;
  54. iothread->thread_id = qemu_get_thread_id();
  55. qemu_sem_post(&iothread->init_done_sem);
  56. while (iothread->running) {
  57. /*
  58. * Note: from functional-wise the g_main_loop_run() below can
  59. * already cover the aio_poll() events, but we can't run the
  60. * main loop unconditionally because explicit aio_poll() here
  61. * is faster than g_main_loop_run() when we do not need the
  62. * gcontext at all (e.g., pure block layer iothreads). In
  63. * other words, when we want to run the gcontext with the
  64. * iothread we need to pay some performance for functionality.
  65. */
  66. aio_poll(iothread->ctx, true);
  67. /*
  68. * We must check the running state again in case it was
  69. * changed in previous aio_poll()
  70. */
  71. if (iothread->running && atomic_read(&iothread->run_gcontext)) {
  72. g_main_loop_run(iothread->main_loop);
  73. }
  74. }
  75. g_main_context_pop_thread_default(iothread->worker_context);
  76. rcu_unregister_thread();
  77. return NULL;
  78. }
  79. /* Runs in iothread_run() thread */
  80. static void iothread_stop_bh(void *opaque)
  81. {
  82. IOThread *iothread = opaque;
  83. iothread->running = false; /* stop iothread_run() */
  84. if (iothread->main_loop) {
  85. g_main_loop_quit(iothread->main_loop);
  86. }
  87. }
  88. void iothread_stop(IOThread *iothread)
  89. {
  90. if (!iothread->ctx || iothread->stopping) {
  91. return;
  92. }
  93. iothread->stopping = true;
  94. aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
  95. qemu_thread_join(&iothread->thread);
  96. }
  97. static void iothread_instance_init(Object *obj)
  98. {
  99. IOThread *iothread = IOTHREAD(obj);
  100. iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
  101. iothread->thread_id = -1;
  102. qemu_sem_init(&iothread->init_done_sem, 0);
  103. /* By default, we don't run gcontext */
  104. atomic_set(&iothread->run_gcontext, 0);
  105. }
  106. static void iothread_instance_finalize(Object *obj)
  107. {
  108. IOThread *iothread = IOTHREAD(obj);
  109. iothread_stop(iothread);
  110. /*
  111. * Before glib2 2.33.10, there is a glib2 bug that GSource context
  112. * pointer may not be cleared even if the context has already been
  113. * destroyed (while it should). Here let's free the AIO context
  114. * earlier to bypass that glib bug.
  115. *
  116. * We can remove this comment after the minimum supported glib2
  117. * version boosts to 2.33.10. Before that, let's free the
  118. * GSources first before destroying any GMainContext.
  119. */
  120. if (iothread->ctx) {
  121. aio_context_unref(iothread->ctx);
  122. iothread->ctx = NULL;
  123. }
  124. if (iothread->worker_context) {
  125. g_main_context_unref(iothread->worker_context);
  126. iothread->worker_context = NULL;
  127. g_main_loop_unref(iothread->main_loop);
  128. iothread->main_loop = NULL;
  129. }
  130. qemu_sem_destroy(&iothread->init_done_sem);
  131. }
  132. static void iothread_init_gcontext(IOThread *iothread)
  133. {
  134. GSource *source;
  135. iothread->worker_context = g_main_context_new();
  136. source = aio_get_g_source(iothread_get_aio_context(iothread));
  137. g_source_attach(source, iothread->worker_context);
  138. g_source_unref(source);
  139. iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
  140. }
  141. static void iothread_complete(UserCreatable *obj, Error **errp)
  142. {
  143. Error *local_error = NULL;
  144. IOThread *iothread = IOTHREAD(obj);
  145. char *name, *thread_name;
  146. iothread->stopping = false;
  147. iothread->running = true;
  148. iothread->ctx = aio_context_new(&local_error);
  149. if (!iothread->ctx) {
  150. error_propagate(errp, local_error);
  151. return;
  152. }
  153. /*
  154. * Init one GMainContext for the iothread unconditionally, even if
  155. * it's not used
  156. */
  157. iothread_init_gcontext(iothread);
  158. aio_context_set_poll_params(iothread->ctx,
  159. iothread->poll_max_ns,
  160. iothread->poll_grow,
  161. iothread->poll_shrink,
  162. &local_error);
  163. if (local_error) {
  164. error_propagate(errp, local_error);
  165. aio_context_unref(iothread->ctx);
  166. iothread->ctx = NULL;
  167. return;
  168. }
  169. /* This assumes we are called from a thread with useful CPU affinity for us
  170. * to inherit.
  171. */
  172. name = object_get_canonical_path_component(OBJECT(obj));
  173. thread_name = g_strdup_printf("IO %s", name);
  174. qemu_thread_create(&iothread->thread, thread_name, iothread_run,
  175. iothread, QEMU_THREAD_JOINABLE);
  176. g_free(thread_name);
  177. g_free(name);
  178. /* Wait for initialization to complete */
  179. while (iothread->thread_id == -1) {
  180. qemu_sem_wait(&iothread->init_done_sem);
  181. }
  182. }
  183. typedef struct {
  184. const char *name;
  185. ptrdiff_t offset; /* field's byte offset in IOThread struct */
  186. } PollParamInfo;
  187. static PollParamInfo poll_max_ns_info = {
  188. "poll-max-ns", offsetof(IOThread, poll_max_ns),
  189. };
  190. static PollParamInfo poll_grow_info = {
  191. "poll-grow", offsetof(IOThread, poll_grow),
  192. };
  193. static PollParamInfo poll_shrink_info = {
  194. "poll-shrink", offsetof(IOThread, poll_shrink),
  195. };
  196. static void iothread_get_poll_param(Object *obj, Visitor *v,
  197. const char *name, void *opaque, Error **errp)
  198. {
  199. IOThread *iothread = IOTHREAD(obj);
  200. PollParamInfo *info = opaque;
  201. int64_t *field = (void *)iothread + info->offset;
  202. visit_type_int64(v, name, field, errp);
  203. }
  204. static void iothread_set_poll_param(Object *obj, Visitor *v,
  205. const char *name, void *opaque, Error **errp)
  206. {
  207. IOThread *iothread = IOTHREAD(obj);
  208. PollParamInfo *info = opaque;
  209. int64_t *field = (void *)iothread + info->offset;
  210. Error *local_err = NULL;
  211. int64_t value;
  212. visit_type_int64(v, name, &value, &local_err);
  213. if (local_err) {
  214. goto out;
  215. }
  216. if (value < 0) {
  217. error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
  218. info->name, INT64_MAX);
  219. goto out;
  220. }
  221. *field = value;
  222. if (iothread->ctx) {
  223. aio_context_set_poll_params(iothread->ctx,
  224. iothread->poll_max_ns,
  225. iothread->poll_grow,
  226. iothread->poll_shrink,
  227. &local_err);
  228. }
  229. out:
  230. error_propagate(errp, local_err);
  231. }
  232. static void iothread_class_init(ObjectClass *klass, void *class_data)
  233. {
  234. UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
  235. ucc->complete = iothread_complete;
  236. object_class_property_add(klass, "poll-max-ns", "int",
  237. iothread_get_poll_param,
  238. iothread_set_poll_param,
  239. NULL, &poll_max_ns_info, &error_abort);
  240. object_class_property_add(klass, "poll-grow", "int",
  241. iothread_get_poll_param,
  242. iothread_set_poll_param,
  243. NULL, &poll_grow_info, &error_abort);
  244. object_class_property_add(klass, "poll-shrink", "int",
  245. iothread_get_poll_param,
  246. iothread_set_poll_param,
  247. NULL, &poll_shrink_info, &error_abort);
  248. }
  249. static const TypeInfo iothread_info = {
  250. .name = TYPE_IOTHREAD,
  251. .parent = TYPE_OBJECT,
  252. .class_init = iothread_class_init,
  253. .instance_size = sizeof(IOThread),
  254. .instance_init = iothread_instance_init,
  255. .instance_finalize = iothread_instance_finalize,
  256. .interfaces = (InterfaceInfo[]) {
  257. {TYPE_USER_CREATABLE},
  258. {}
  259. },
  260. };
  261. static void iothread_register_types(void)
  262. {
  263. type_register_static(&iothread_info);
  264. }
  265. type_init(iothread_register_types)
  266. char *iothread_get_id(IOThread *iothread)
  267. {
  268. return object_get_canonical_path_component(OBJECT(iothread));
  269. }
  270. AioContext *iothread_get_aio_context(IOThread *iothread)
  271. {
  272. return iothread->ctx;
  273. }
  274. static int query_one_iothread(Object *object, void *opaque)
  275. {
  276. IOThreadInfoList ***prev = opaque;
  277. IOThreadInfoList *elem;
  278. IOThreadInfo *info;
  279. IOThread *iothread;
  280. iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  281. if (!iothread) {
  282. return 0;
  283. }
  284. info = g_new0(IOThreadInfo, 1);
  285. info->id = iothread_get_id(iothread);
  286. info->thread_id = iothread->thread_id;
  287. info->poll_max_ns = iothread->poll_max_ns;
  288. info->poll_grow = iothread->poll_grow;
  289. info->poll_shrink = iothread->poll_shrink;
  290. elem = g_new0(IOThreadInfoList, 1);
  291. elem->value = info;
  292. elem->next = NULL;
  293. **prev = elem;
  294. *prev = &elem->next;
  295. return 0;
  296. }
  297. IOThreadInfoList *qmp_query_iothreads(Error **errp)
  298. {
  299. IOThreadInfoList *head = NULL;
  300. IOThreadInfoList **prev = &head;
  301. Object *container = object_get_objects_root();
  302. object_child_foreach(container, query_one_iothread, &prev);
  303. return head;
  304. }
  305. GMainContext *iothread_get_g_main_context(IOThread *iothread)
  306. {
  307. atomic_set(&iothread->run_gcontext, 1);
  308. aio_notify(iothread->ctx);
  309. return iothread->worker_context;
  310. }
  311. IOThread *iothread_create(const char *id, Error **errp)
  312. {
  313. Object *obj;
  314. obj = object_new_with_props(TYPE_IOTHREAD,
  315. object_get_internal_root(),
  316. id, errp, NULL);
  317. return IOTHREAD(obj);
  318. }
  319. void iothread_destroy(IOThread *iothread)
  320. {
  321. object_unparent(OBJECT(iothread));
  322. }
  323. /* Lookup IOThread by its id. Only finds user-created objects, not internal
  324. * iothread_create() objects. */
  325. IOThread *iothread_by_id(const char *id)
  326. {
  327. return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
  328. }