You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

connection.c 36KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. /*-------------------------------------------------------------------------
  2. *
  3. * connection.c
  4. * Connection management functions for postgres_fdw
  5. *
  6. * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
  7. *
  8. * IDENTIFICATION
  9. * contrib/postgres_fdw/connection.c
  10. *
  11. *-------------------------------------------------------------------------
  12. */
  13. #include "postgres.h"
  14. #include "postgres_fdw.h"
  15. #include "access/htup_details.h"
  16. #include "catalog/pg_user_mapping.h"
  17. #include "access/xact.h"
  18. #include "mb/pg_wchar.h"
  19. #include "miscadmin.h"
  20. #include "pgstat.h"
  21. #include "storage/latch.h"
  22. #include "utils/hsearch.h"
  23. #include "utils/inval.h"
  24. #include "utils/memutils.h"
  25. #include "utils/syscache.h"
  26. /*
  27. * Connection cache hash table entry
  28. *
  29. * The lookup key in this hash table is the user mapping OID. We use just one
  30. * connection per user mapping ID, which ensures that all the scans use the
  31. * same snapshot during a query. Using the user mapping OID rather than
  32. * the foreign server OID + user OID avoids creating multiple connections when
  33. * the public user mapping applies to all user OIDs.
  34. *
  35. * The "conn" pointer can be NULL if we don't currently have a live connection.
  36. * When we do have a connection, xact_depth tracks the current depth of
  37. * transactions and subtransactions open on the remote side. We need to issue
  38. * commands at the same nesting depth on the remote as we're executing at
  39. * ourselves, so that rolling back a subtransaction will kill the right
  40. * queries and not the wrong ones.
  41. */
  42. typedef Oid ConnCacheKey;
  43. typedef struct ConnCacheEntry
  44. {
  45. ConnCacheKey key; /* hash key (must be first) */
  46. PGconn *conn; /* connection to foreign server, or NULL */
  47. /* Remaining fields are invalid when conn is NULL: */
  48. int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
  49. * one level of subxact open, etc */
  50. bool have_prep_stmt; /* have we prepared any stmts in this xact? */
  51. bool have_error; /* have any subxacts aborted in this xact? */
  52. bool changing_xact_state; /* xact state change in process */
  53. bool invalidated; /* true if reconnect is pending */
  54. uint32 server_hashvalue; /* hash value of foreign server OID */
  55. uint32 mapping_hashvalue; /* hash value of user mapping OID */
  56. } ConnCacheEntry;
  57. /*
  58. * Connection cache (initialized on first use)
  59. */
  60. static HTAB *ConnectionHash = NULL;
  61. /* for assigning cursor numbers and prepared statement numbers */
  62. static unsigned int cursor_number = 0;
  63. static unsigned int prep_stmt_number = 0;
  64. /* tracks whether any work is needed in callback functions */
  65. static bool xact_got_connection = false;
  66. /* prototypes of private functions */
  67. static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
  68. static void disconnect_pg_server(ConnCacheEntry *entry);
  69. static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
  70. static void configure_remote_session(PGconn *conn);
  71. static void do_sql_command(PGconn *conn, const char *sql);
  72. static void begin_remote_xact(ConnCacheEntry *entry);
  73. static void pgfdw_xact_callback(XactEvent event, void *arg);
  74. static void pgfdw_subxact_callback(SubXactEvent event,
  75. SubTransactionId mySubid,
  76. SubTransactionId parentSubid,
  77. void *arg);
  78. static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
  79. static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
  80. static bool pgfdw_cancel_query(PGconn *conn);
  81. static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
  82. bool ignore_errors);
  83. static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
  84. PGresult **result);
  85. /*
  86. * Get a PGconn which can be used to execute queries on the remote PostgreSQL
  87. * server with the user's authorization. A new connection is established
  88. * if we don't already have a suitable one, and a transaction is opened at
  89. * the right subtransaction nesting depth if we didn't do that already.
  90. *
  91. * will_prep_stmt must be true if caller intends to create any prepared
  92. * statements. Since those don't go away automatically at transaction end
  93. * (not even on error), we need this flag to cue manual cleanup.
  94. */
  95. PGconn *
  96. GetConnection(UserMapping *user, bool will_prep_stmt)
  97. {
  98. bool found;
  99. ConnCacheEntry *entry;
  100. ConnCacheKey key;
  101. /* First time through, initialize connection cache hashtable */
  102. if (ConnectionHash == NULL)
  103. {
  104. HASHCTL ctl;
  105. MemSet(&ctl, 0, sizeof(ctl));
  106. ctl.keysize = sizeof(ConnCacheKey);
  107. ctl.entrysize = sizeof(ConnCacheEntry);
  108. /* allocate ConnectionHash in the cache context */
  109. ctl.hcxt = CacheMemoryContext;
  110. ConnectionHash = hash_create("postgres_fdw connections", 8,
  111. &ctl,
  112. HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
  113. /*
  114. * Register some callback functions that manage connection cleanup.
  115. * This should be done just once in each backend.
  116. */
  117. RegisterXactCallback(pgfdw_xact_callback, NULL);
  118. RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
  119. CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
  120. pgfdw_inval_callback, (Datum) 0);
  121. CacheRegisterSyscacheCallback(USERMAPPINGOID,
  122. pgfdw_inval_callback, (Datum) 0);
  123. }
  124. /* Set flag that we did GetConnection during the current transaction */
  125. xact_got_connection = true;
  126. /* Create hash key for the entry. Assume no pad bytes in key struct */
  127. key = user->umid;
  128. /*
  129. * Find or create cached entry for requested connection.
  130. */
  131. entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
  132. if (!found)
  133. {
  134. /*
  135. * We need only clear "conn" here; remaining fields will be filled
  136. * later when "conn" is set.
  137. */
  138. entry->conn = NULL;
  139. }
  140. /* Reject further use of connections which failed abort cleanup. */
  141. pgfdw_reject_incomplete_xact_state_change(entry);
  142. /*
  143. * If the connection needs to be remade due to invalidation, disconnect as
  144. * soon as we're out of all transactions.
  145. */
  146. if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
  147. {
  148. elog(DEBUG3, "closing connection %p for option changes to take effect",
  149. entry->conn);
  150. disconnect_pg_server(entry);
  151. }
  152. /*
  153. * We don't check the health of cached connection here, because it would
  154. * require some overhead. Broken connection will be detected when the
  155. * connection is actually used.
  156. */
  157. /*
  158. * If cache entry doesn't have a connection, we have to establish a new
  159. * connection. (If connect_pg_server throws an error, the cache entry
  160. * will remain in a valid empty state, ie conn == NULL.)
  161. */
  162. if (entry->conn == NULL)
  163. {
  164. ForeignServer *server = GetForeignServer(user->serverid);
  165. /* Reset all transient state fields, to be sure all are clean */
  166. entry->xact_depth = 0;
  167. entry->have_prep_stmt = false;
  168. entry->have_error = false;
  169. entry->changing_xact_state = false;
  170. entry->invalidated = false;
  171. entry->server_hashvalue =
  172. GetSysCacheHashValue1(FOREIGNSERVEROID,
  173. ObjectIdGetDatum(server->serverid));
  174. entry->mapping_hashvalue =
  175. GetSysCacheHashValue1(USERMAPPINGOID,
  176. ObjectIdGetDatum(user->umid));
  177. /* Now try to make the connection */
  178. entry->conn = connect_pg_server(server, user);
  179. elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
  180. entry->conn, server->servername, user->umid, user->userid);
  181. }
  182. /*
  183. * Start a new transaction or subtransaction if needed.
  184. */
  185. begin_remote_xact(entry);
  186. /* Remember if caller will prepare statements */
  187. entry->have_prep_stmt |= will_prep_stmt;
  188. return entry->conn;
  189. }
  190. /*
  191. * Connect to remote server using specified server and user mapping properties.
  192. */
  193. static PGconn *
  194. connect_pg_server(ForeignServer *server, UserMapping *user)
  195. {
  196. PGconn *volatile conn = NULL;
  197. /*
  198. * Use PG_TRY block to ensure closing connection on error.
  199. */
  200. PG_TRY();
  201. {
  202. const char **keywords;
  203. const char **values;
  204. int n;
  205. /*
  206. * Construct connection params from generic options of ForeignServer
  207. * and UserMapping. (Some of them might not be libpq options, in
  208. * which case we'll just waste a few array slots.) Add 3 extra slots
  209. * for fallback_application_name, client_encoding, end marker.
  210. */
  211. n = list_length(server->options) + list_length(user->options) + 3;
  212. keywords = (const char **) palloc(n * sizeof(char *));
  213. values = (const char **) palloc(n * sizeof(char *));
  214. n = 0;
  215. n += ExtractConnectionOptions(server->options,
  216. keywords + n, values + n);
  217. n += ExtractConnectionOptions(user->options,
  218. keywords + n, values + n);
  219. /* Use "postgres_fdw" as fallback_application_name. */
  220. keywords[n] = "fallback_application_name";
  221. values[n] = "postgres_fdw";
  222. n++;
  223. /* Set client_encoding so that libpq can convert encoding properly. */
  224. keywords[n] = "client_encoding";
  225. values[n] = GetDatabaseEncodingName();
  226. n++;
  227. keywords[n] = values[n] = NULL;
  228. /* verify connection parameters and make connection */
  229. check_conn_params(keywords, values, user);
  230. conn = PQconnectdbParams(keywords, values, false);
  231. if (!conn || PQstatus(conn) != CONNECTION_OK)
  232. ereport(ERROR,
  233. (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
  234. errmsg("could not connect to server \"%s\"",
  235. server->servername),
  236. errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
  237. /*
  238. * Check that non-superuser has used password to establish connection;
  239. * otherwise, he's piggybacking on the postgres server's user
  240. * identity. See also dblink_security_check() in contrib/dblink.
  241. */
  242. if (!superuser_arg(user->userid) && !PQconnectionUsedPassword(conn))
  243. ereport(ERROR,
  244. (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
  245. errmsg("password is required"),
  246. errdetail("Non-superuser cannot connect if the server does not request a password."),
  247. errhint("Target server's authentication method must be changed.")));
  248. /* Prepare new session for use */
  249. configure_remote_session(conn);
  250. pfree(keywords);
  251. pfree(values);
  252. }
  253. PG_CATCH();
  254. {
  255. /* Release PGconn data structure if we managed to create one */
  256. if (conn)
  257. PQfinish(conn);
  258. PG_RE_THROW();
  259. }
  260. PG_END_TRY();
  261. return conn;
  262. }
  263. /*
  264. * Disconnect any open connection for a connection cache entry.
  265. */
  266. static void
  267. disconnect_pg_server(ConnCacheEntry *entry)
  268. {
  269. if (entry->conn != NULL)
  270. {
  271. PQfinish(entry->conn);
  272. entry->conn = NULL;
  273. }
  274. }
  275. /*
  276. * For non-superusers, insist that the connstr specify a password. This
  277. * prevents a password from being picked up from .pgpass, a service file,
  278. * the environment, etc. We don't want the postgres user's passwords
  279. * to be accessible to non-superusers. (See also dblink_connstr_check in
  280. * contrib/dblink.)
  281. */
  282. static void
  283. check_conn_params(const char **keywords, const char **values, UserMapping *user)
  284. {
  285. int i;
  286. /* no check required if superuser */
  287. if (superuser_arg(user->userid))
  288. return;
  289. /* ok if params contain a non-empty password */
  290. for (i = 0; keywords[i] != NULL; i++)
  291. {
  292. if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
  293. return;
  294. }
  295. ereport(ERROR,
  296. (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
  297. errmsg("password is required"),
  298. errdetail("Non-superusers must provide a password in the user mapping.")));
  299. }
  300. /*
  301. * Issue SET commands to make sure remote session is configured properly.
  302. *
  303. * We do this just once at connection, assuming nothing will change the
  304. * values later. Since we'll never send volatile function calls to the
  305. * remote, there shouldn't be any way to break this assumption from our end.
  306. * It's possible to think of ways to break it at the remote end, eg making
  307. * a foreign table point to a view that includes a set_config call ---
  308. * but once you admit the possibility of a malicious view definition,
  309. * there are any number of ways to break things.
  310. */
  311. static void
  312. configure_remote_session(PGconn *conn)
  313. {
  314. int remoteversion = PQserverVersion(conn);
  315. /* Force the search path to contain only pg_catalog (see deparse.c) */
  316. do_sql_command(conn, "SET search_path = pg_catalog");
  317. /*
  318. * Set remote timezone; this is basically just cosmetic, since all
  319. * transmitted and returned timestamptzs should specify a zone explicitly
  320. * anyway. However it makes the regression test outputs more predictable.
  321. *
  322. * We don't risk setting remote zone equal to ours, since the remote
  323. * server might use a different timezone database. Instead, use UTC
  324. * (quoted, because very old servers are picky about case).
  325. */
  326. do_sql_command(conn, "SET timezone = 'UTC'");
  327. /*
  328. * Set values needed to ensure unambiguous data output from remote. (This
  329. * logic should match what pg_dump does. See also set_transmission_modes
  330. * in postgres_fdw.c.)
  331. */
  332. do_sql_command(conn, "SET datestyle = ISO");
  333. if (remoteversion >= 80400)
  334. do_sql_command(conn, "SET intervalstyle = postgres");
  335. if (remoteversion >= 90000)
  336. do_sql_command(conn, "SET extra_float_digits = 3");
  337. else
  338. do_sql_command(conn, "SET extra_float_digits = 2");
  339. }
  340. /*
  341. * Convenience subroutine to issue a non-data-returning SQL command to remote
  342. */
  343. static void
  344. do_sql_command(PGconn *conn, const char *sql)
  345. {
  346. PGresult *res;
  347. if (!PQsendQuery(conn, sql))
  348. pgfdw_report_error(ERROR, NULL, conn, false, sql);
  349. res = pgfdw_get_result(conn, sql);
  350. if (PQresultStatus(res) != PGRES_COMMAND_OK)
  351. pgfdw_report_error(ERROR, res, conn, true, sql);
  352. PQclear(res);
  353. }
  354. /*
  355. * Start remote transaction or subtransaction, if needed.
  356. *
  357. * Note that we always use at least REPEATABLE READ in the remote session.
  358. * This is so that, if a query initiates multiple scans of the same or
  359. * different foreign tables, we will get snapshot-consistent results from
  360. * those scans. A disadvantage is that we can't provide sane emulation of
  361. * READ COMMITTED behavior --- it would be nice if we had some other way to
  362. * control which remote queries share a snapshot.
  363. */
  364. static void
  365. begin_remote_xact(ConnCacheEntry *entry)
  366. {
  367. int curlevel = GetCurrentTransactionNestLevel();
  368. /* Start main transaction if we haven't yet */
  369. if (entry->xact_depth <= 0)
  370. {
  371. const char *sql;
  372. elog(DEBUG3, "starting remote transaction on connection %p",
  373. entry->conn);
  374. if (IsolationIsSerializable())
  375. sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
  376. else
  377. sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
  378. entry->changing_xact_state = true;
  379. do_sql_command(entry->conn, sql);
  380. entry->xact_depth = 1;
  381. entry->changing_xact_state = false;
  382. }
  383. /*
  384. * If we're in a subtransaction, stack up savepoints to match our level.
  385. * This ensures we can rollback just the desired effects when a
  386. * subtransaction aborts.
  387. */
  388. while (entry->xact_depth < curlevel)
  389. {
  390. char sql[64];
  391. snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
  392. entry->changing_xact_state = true;
  393. do_sql_command(entry->conn, sql);
  394. entry->xact_depth++;
  395. entry->changing_xact_state = false;
  396. }
  397. }
  398. /*
  399. * Release connection reference count created by calling GetConnection.
  400. */
  401. void
  402. ReleaseConnection(PGconn *conn)
  403. {
  404. /*
  405. * Currently, we don't actually track connection references because all
  406. * cleanup is managed on a transaction or subtransaction basis instead. So
  407. * there's nothing to do here.
  408. */
  409. }
  410. /*
  411. * Assign a "unique" number for a cursor.
  412. *
  413. * These really only need to be unique per connection within a transaction.
  414. * For the moment we ignore the per-connection point and assign them across
  415. * all connections in the transaction, but we ask for the connection to be
  416. * supplied in case we want to refine that.
  417. *
  418. * Note that even if wraparound happens in a very long transaction, actual
  419. * collisions are highly improbable; just be sure to use %u not %d to print.
  420. */
  421. unsigned int
  422. GetCursorNumber(PGconn *conn)
  423. {
  424. return ++cursor_number;
  425. }
  426. /*
  427. * Assign a "unique" number for a prepared statement.
  428. *
  429. * This works much like GetCursorNumber, except that we never reset the counter
  430. * within a session. That's because we can't be 100% sure we've gotten rid
  431. * of all prepared statements on all connections, and it's not really worth
  432. * increasing the risk of prepared-statement name collisions by resetting.
  433. */
  434. unsigned int
  435. GetPrepStmtNumber(PGconn *conn)
  436. {
  437. return ++prep_stmt_number;
  438. }
  439. /*
  440. * Submit a query and wait for the result.
  441. *
  442. * This function is interruptible by signals.
  443. *
  444. * Caller is responsible for the error handling on the result.
  445. */
  446. PGresult *
  447. pgfdw_exec_query(PGconn *conn, const char *query)
  448. {
  449. /*
  450. * Submit a query. Since we don't use non-blocking mode, this also can
  451. * block. But its risk is relatively small, so we ignore that for now.
  452. */
  453. if (!PQsendQuery(conn, query))
  454. pgfdw_report_error(ERROR, NULL, conn, false, query);
  455. /* Wait for the result. */
  456. return pgfdw_get_result(conn, query);
  457. }
  458. /*
  459. * Wait for the result from a prior asynchronous execution function call.
  460. *
  461. * This function offers quick responsiveness by checking for any interruptions.
  462. *
  463. * This function emulates PQexec()'s behavior of returning the last result
  464. * when there are many.
  465. *
  466. * Caller is responsible for the error handling on the result.
  467. */
  468. PGresult *
  469. pgfdw_get_result(PGconn *conn, const char *query)
  470. {
  471. PGresult *volatile last_res = NULL;
  472. /* In what follows, do not leak any PGresults on an error. */
  473. PG_TRY();
  474. {
  475. for (;;)
  476. {
  477. PGresult *res;
  478. while (PQisBusy(conn))
  479. {
  480. int wc;
  481. /* Sleep until there's something to do */
  482. wc = WaitLatchOrSocket(MyLatch,
  483. WL_LATCH_SET | WL_SOCKET_READABLE |
  484. WL_EXIT_ON_PM_DEATH,
  485. PQsocket(conn),
  486. -1L, PG_WAIT_EXTENSION);
  487. ResetLatch(MyLatch);
  488. CHECK_FOR_INTERRUPTS();
  489. /* Data available in socket? */
  490. if (wc & WL_SOCKET_READABLE)
  491. {
  492. if (!PQconsumeInput(conn))
  493. pgfdw_report_error(ERROR, NULL, conn, false, query);
  494. }
  495. }
  496. res = PQgetResult(conn);
  497. if (res == NULL)
  498. break; /* query is complete */
  499. PQclear(last_res);
  500. last_res = res;
  501. }
  502. }
  503. PG_CATCH();
  504. {
  505. PQclear(last_res);
  506. PG_RE_THROW();
  507. }
  508. PG_END_TRY();
  509. return last_res;
  510. }
  511. /*
  512. * Report an error we got from the remote server.
  513. *
  514. * elevel: error level to use (typically ERROR, but might be less)
  515. * res: PGresult containing the error
  516. * conn: connection we did the query on
  517. * clear: if true, PQclear the result (otherwise caller will handle it)
  518. * sql: NULL, or text of remote command we tried to execute
  519. *
  520. * Note: callers that choose not to throw ERROR for a remote error are
  521. * responsible for making sure that the associated ConnCacheEntry gets
  522. * marked with have_error = true.
  523. */
  524. void
  525. pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
  526. bool clear, const char *sql)
  527. {
  528. /* If requested, PGresult must be released before leaving this function. */
  529. PG_TRY();
  530. {
  531. char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
  532. char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
  533. char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
  534. char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
  535. char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
  536. int sqlstate;
  537. if (diag_sqlstate)
  538. sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
  539. diag_sqlstate[1],
  540. diag_sqlstate[2],
  541. diag_sqlstate[3],
  542. diag_sqlstate[4]);
  543. else
  544. sqlstate = ERRCODE_CONNECTION_FAILURE;
  545. /*
  546. * If we don't get a message from the PGresult, try the PGconn. This
  547. * is needed because for connection-level failures, PQexec may just
  548. * return NULL, not a PGresult at all.
  549. */
  550. if (message_primary == NULL)
  551. message_primary = pchomp(PQerrorMessage(conn));
  552. ereport(elevel,
  553. (errcode(sqlstate),
  554. message_primary ? errmsg_internal("%s", message_primary) :
  555. errmsg("could not obtain message string for remote error"),
  556. message_detail ? errdetail_internal("%s", message_detail) : 0,
  557. message_hint ? errhint("%s", message_hint) : 0,
  558. message_context ? errcontext("%s", message_context) : 0,
  559. sql ? errcontext("remote SQL command: %s", sql) : 0));
  560. }
  561. PG_CATCH();
  562. {
  563. if (clear)
  564. PQclear(res);
  565. PG_RE_THROW();
  566. }
  567. PG_END_TRY();
  568. if (clear)
  569. PQclear(res);
  570. }
  571. /*
  572. * pgfdw_xact_callback --- cleanup at main-transaction end.
  573. */
  574. static void
  575. pgfdw_xact_callback(XactEvent event, void *arg)
  576. {
  577. HASH_SEQ_STATUS scan;
  578. ConnCacheEntry *entry;
  579. /* Quick exit if no connections were touched in this transaction. */
  580. if (!xact_got_connection)
  581. return;
  582. /*
  583. * Scan all connection cache entries to find open remote transactions, and
  584. * close them.
  585. */
  586. hash_seq_init(&scan, ConnectionHash);
  587. while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
  588. {
  589. PGresult *res;
  590. /* Ignore cache entry if no open connection right now */
  591. if (entry->conn == NULL)
  592. continue;
  593. /* If it has an open remote transaction, try to close it */
  594. if (entry->xact_depth > 0)
  595. {
  596. bool abort_cleanup_failure = false;
  597. elog(DEBUG3, "closing remote transaction on connection %p",
  598. entry->conn);
  599. switch (event)
  600. {
  601. case XACT_EVENT_PARALLEL_PRE_COMMIT:
  602. case XACT_EVENT_PRE_COMMIT:
  603. /*
  604. * If abort cleanup previously failed for this connection,
  605. * we can't issue any more commands against it.
  606. */
  607. pgfdw_reject_incomplete_xact_state_change(entry);
  608. /* Commit all remote transactions during pre-commit */
  609. entry->changing_xact_state = true;
  610. do_sql_command(entry->conn, "COMMIT TRANSACTION");
  611. entry->changing_xact_state = false;
  612. /*
  613. * If there were any errors in subtransactions, and we
  614. * made prepared statements, do a DEALLOCATE ALL to make
  615. * sure we get rid of all prepared statements. This is
  616. * annoying and not terribly bulletproof, but it's
  617. * probably not worth trying harder.
  618. *
  619. * DEALLOCATE ALL only exists in 8.3 and later, so this
  620. * constrains how old a server postgres_fdw can
  621. * communicate with. We intentionally ignore errors in
  622. * the DEALLOCATE, so that we can hobble along to some
  623. * extent with older servers (leaking prepared statements
  624. * as we go; but we don't really support update operations
  625. * pre-8.3 anyway).
  626. */
  627. if (entry->have_prep_stmt && entry->have_error)
  628. {
  629. res = PQexec(entry->conn, "DEALLOCATE ALL");
  630. PQclear(res);
  631. }
  632. entry->have_prep_stmt = false;
  633. entry->have_error = false;
  634. break;
  635. case XACT_EVENT_PRE_PREPARE:
  636. /*
  637. * We disallow remote transactions that modified anything,
  638. * since it's not very reasonable to hold them open until
  639. * the prepared transaction is committed. For the moment,
  640. * throw error unconditionally; later we might allow
  641. * read-only cases. Note that the error will cause us to
  642. * come right back here with event == XACT_EVENT_ABORT, so
  643. * we'll clean up the connection state at that point.
  644. */
  645. ereport(ERROR,
  646. (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
  647. errmsg("cannot prepare a transaction that modified remote tables")));
  648. break;
  649. case XACT_EVENT_PARALLEL_COMMIT:
  650. case XACT_EVENT_COMMIT:
  651. case XACT_EVENT_PREPARE:
  652. /* Pre-commit should have closed the open transaction */
  653. elog(ERROR, "missed cleaning up connection during pre-commit");
  654. break;
  655. case XACT_EVENT_PARALLEL_ABORT:
  656. case XACT_EVENT_ABORT:
  657. /*
  658. * Don't try to clean up the connection if we're already
  659. * in error recursion trouble.
  660. */
  661. if (in_error_recursion_trouble())
  662. entry->changing_xact_state = true;
  663. /*
  664. * If connection is already unsalvageable, don't touch it
  665. * further.
  666. */
  667. if (entry->changing_xact_state)
  668. break;
  669. /*
  670. * Mark this connection as in the process of changing
  671. * transaction state.
  672. */
  673. entry->changing_xact_state = true;
  674. /* Assume we might have lost track of prepared statements */
  675. entry->have_error = true;
  676. /*
  677. * If a command has been submitted to the remote server by
  678. * using an asynchronous execution function, the command
  679. * might not have yet completed. Check to see if a
  680. * command is still being processed by the remote server,
  681. * and if so, request cancellation of the command.
  682. */
  683. if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
  684. !pgfdw_cancel_query(entry->conn))
  685. {
  686. /* Unable to cancel running query. */
  687. abort_cleanup_failure = true;
  688. }
  689. else if (!pgfdw_exec_cleanup_query(entry->conn,
  690. "ABORT TRANSACTION",
  691. false))
  692. {
  693. /* Unable to abort remote transaction. */
  694. abort_cleanup_failure = true;
  695. }
  696. else if (entry->have_prep_stmt && entry->have_error &&
  697. !pgfdw_exec_cleanup_query(entry->conn,
  698. "DEALLOCATE ALL",
  699. true))
  700. {
  701. /* Trouble clearing prepared statements. */
  702. abort_cleanup_failure = true;
  703. }
  704. else
  705. {
  706. entry->have_prep_stmt = false;
  707. entry->have_error = false;
  708. }
  709. /* Disarm changing_xact_state if it all worked. */
  710. entry->changing_xact_state = abort_cleanup_failure;
  711. break;
  712. }
  713. }
  714. /* Reset state to show we're out of a transaction */
  715. entry->xact_depth = 0;
  716. /*
  717. * If the connection isn't in a good idle state, discard it to
  718. * recover. Next GetConnection will open a new connection.
  719. */
  720. if (PQstatus(entry->conn) != CONNECTION_OK ||
  721. PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
  722. entry->changing_xact_state)
  723. {
  724. elog(DEBUG3, "discarding connection %p", entry->conn);
  725. disconnect_pg_server(entry);
  726. }
  727. }
  728. /*
  729. * Regardless of the event type, we can now mark ourselves as out of the
  730. * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
  731. * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
  732. */
  733. xact_got_connection = false;
  734. /* Also reset cursor numbering for next transaction */
  735. cursor_number = 0;
  736. }
  737. /*
  738. * pgfdw_subxact_callback --- cleanup at subtransaction end.
  739. */
  740. static void
  741. pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
  742. SubTransactionId parentSubid, void *arg)
  743. {
  744. HASH_SEQ_STATUS scan;
  745. ConnCacheEntry *entry;
  746. int curlevel;
  747. /* Nothing to do at subxact start, nor after commit. */
  748. if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
  749. event == SUBXACT_EVENT_ABORT_SUB))
  750. return;
  751. /* Quick exit if no connections were touched in this transaction. */
  752. if (!xact_got_connection)
  753. return;
  754. /*
  755. * Scan all connection cache entries to find open remote subtransactions
  756. * of the current level, and close them.
  757. */
  758. curlevel = GetCurrentTransactionNestLevel();
  759. hash_seq_init(&scan, ConnectionHash);
  760. while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
  761. {
  762. char sql[100];
  763. /*
  764. * We only care about connections with open remote subtransactions of
  765. * the current level.
  766. */
  767. if (entry->conn == NULL || entry->xact_depth < curlevel)
  768. continue;
  769. if (entry->xact_depth > curlevel)
  770. elog(ERROR, "missed cleaning up remote subtransaction at level %d",
  771. entry->xact_depth);
  772. if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
  773. {
  774. /*
  775. * If abort cleanup previously failed for this connection, we
  776. * can't issue any more commands against it.
  777. */
  778. pgfdw_reject_incomplete_xact_state_change(entry);
  779. /* Commit all remote subtransactions during pre-commit */
  780. snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
  781. entry->changing_xact_state = true;
  782. do_sql_command(entry->conn, sql);
  783. entry->changing_xact_state = false;
  784. }
  785. else if (in_error_recursion_trouble())
  786. {
  787. /*
  788. * Don't try to clean up the connection if we're already in error
  789. * recursion trouble.
  790. */
  791. entry->changing_xact_state = true;
  792. }
  793. else if (!entry->changing_xact_state)
  794. {
  795. bool abort_cleanup_failure = false;
  796. /* Remember that abort cleanup is in progress. */
  797. entry->changing_xact_state = true;
  798. /* Assume we might have lost track of prepared statements */
  799. entry->have_error = true;
  800. /*
  801. * If a command has been submitted to the remote server by using
  802. * an asynchronous execution function, the command might not have
  803. * yet completed. Check to see if a command is still being
  804. * processed by the remote server, and if so, request cancellation
  805. * of the command.
  806. */
  807. if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
  808. !pgfdw_cancel_query(entry->conn))
  809. abort_cleanup_failure = true;
  810. else
  811. {
  812. /* Rollback all remote subtransactions during abort */
  813. snprintf(sql, sizeof(sql),
  814. "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
  815. curlevel, curlevel);
  816. if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
  817. abort_cleanup_failure = true;
  818. }
  819. /* Disarm changing_xact_state if it all worked. */
  820. entry->changing_xact_state = abort_cleanup_failure;
  821. }
  822. /* OK, we're outta that level of subtransaction */
  823. entry->xact_depth--;
  824. }
  825. }
  826. /*
  827. * Connection invalidation callback function
  828. *
  829. * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
  830. * mark connections depending on that entry as needing to be remade.
  831. * We can't immediately destroy them, since they might be in the midst of
  832. * a transaction, but we'll remake them at the next opportunity.
  833. *
  834. * Although most cache invalidation callbacks blow away all the related stuff
  835. * regardless of the given hashvalue, connections are expensive enough that
  836. * it's worth trying to avoid that.
  837. *
  838. * NB: We could avoid unnecessary disconnection more strictly by examining
  839. * individual option values, but it seems too much effort for the gain.
  840. */
  841. static void
  842. pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
  843. {
  844. HASH_SEQ_STATUS scan;
  845. ConnCacheEntry *entry;
  846. Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
  847. /* ConnectionHash must exist already, if we're registered */
  848. hash_seq_init(&scan, ConnectionHash);
  849. while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
  850. {
  851. /* Ignore invalid entries */
  852. if (entry->conn == NULL)
  853. continue;
  854. /* hashvalue == 0 means a cache reset, must clear all state */
  855. if (hashvalue == 0 ||
  856. (cacheid == FOREIGNSERVEROID &&
  857. entry->server_hashvalue == hashvalue) ||
  858. (cacheid == USERMAPPINGOID &&
  859. entry->mapping_hashvalue == hashvalue))
  860. entry->invalidated = true;
  861. }
  862. }
  863. /*
  864. * Raise an error if the given connection cache entry is marked as being
  865. * in the middle of an xact state change. This should be called at which no
  866. * such change is expected to be in progress; if one is found to be in
  867. * progress, it means that we aborted in the middle of a previous state change
  868. * and now don't know what the remote transaction state actually is.
  869. * Such connections can't safely be further used. Re-establishing the
  870. * connection would change the snapshot and roll back any writes already
  871. * performed, so that's not an option, either. Thus, we must abort.
  872. */
  873. static void
  874. pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
  875. {
  876. HeapTuple tup;
  877. Form_pg_user_mapping umform;
  878. ForeignServer *server;
  879. /* nothing to do for inactive entries and entries of sane state */
  880. if (entry->conn == NULL || !entry->changing_xact_state)
  881. return;
  882. /* make sure this entry is inactive */
  883. disconnect_pg_server(entry);
  884. /* find server name to be shown in the message below */
  885. tup = SearchSysCache1(USERMAPPINGOID,
  886. ObjectIdGetDatum(entry->key));
  887. if (!HeapTupleIsValid(tup))
  888. elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
  889. umform = (Form_pg_user_mapping) GETSTRUCT(tup);
  890. server = GetForeignServer(umform->umserver);
  891. ReleaseSysCache(tup);
  892. ereport(ERROR,
  893. (errcode(ERRCODE_CONNECTION_EXCEPTION),
  894. errmsg("connection to server \"%s\" was lost",
  895. server->servername)));
  896. }
  897. /*
  898. * Cancel the currently-in-progress query (whose query text we do not have)
  899. * and ignore the result. Returns true if we successfully cancel the query
  900. * and discard any pending result, and false if not.
  901. */
  902. static bool
  903. pgfdw_cancel_query(PGconn *conn)
  904. {
  905. PGcancel *cancel;
  906. char errbuf[256];
  907. PGresult *result = NULL;
  908. TimestampTz endtime;
  909. /*
  910. * If it takes too long to cancel the query and discard the result, assume
  911. * the connection is dead.
  912. */
  913. endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
  914. /*
  915. * Issue cancel request. Unfortunately, there's no good way to limit the
  916. * amount of time that we might block inside PQgetCancel().
  917. */
  918. if ((cancel = PQgetCancel(conn)))
  919. {
  920. if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
  921. {
  922. ereport(WARNING,
  923. (errcode(ERRCODE_CONNECTION_FAILURE),
  924. errmsg("could not send cancel request: %s",
  925. errbuf)));
  926. PQfreeCancel(cancel);
  927. return false;
  928. }
  929. PQfreeCancel(cancel);
  930. }
  931. /* Get and discard the result of the query. */
  932. if (pgfdw_get_cleanup_result(conn, endtime, &result))
  933. return false;
  934. PQclear(result);
  935. return true;
  936. }
  937. /*
  938. * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
  939. * result. If the query is executed without error, the return value is true.
  940. * If the query is executed successfully but returns an error, the return
  941. * value is true if and only if ignore_errors is set. If the query can't be
  942. * sent or times out, the return value is false.
  943. */
  944. static bool
  945. pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
  946. {
  947. PGresult *result = NULL;
  948. TimestampTz endtime;
  949. /*
  950. * If it takes too long to execute a cleanup query, assume the connection
  951. * is dead. It's fairly likely that this is why we aborted in the first
  952. * place (e.g. statement timeout, user cancel), so the timeout shouldn't
  953. * be too long.
  954. */
  955. endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
  956. /*
  957. * Submit a query. Since we don't use non-blocking mode, this also can
  958. * block. But its risk is relatively small, so we ignore that for now.
  959. */
  960. if (!PQsendQuery(conn, query))
  961. {
  962. pgfdw_report_error(WARNING, NULL, conn, false, query);
  963. return false;
  964. }
  965. /* Get the result of the query. */
  966. if (pgfdw_get_cleanup_result(conn, endtime, &result))
  967. return false;
  968. /* Issue a warning if not successful. */
  969. if (PQresultStatus(result) != PGRES_COMMAND_OK)
  970. {
  971. pgfdw_report_error(WARNING, result, conn, true, query);
  972. return ignore_errors;
  973. }
  974. PQclear(result);
  975. return true;
  976. }
  977. /*
  978. * Get, during abort cleanup, the result of a query that is in progress. This
  979. * might be a query that is being interrupted by transaction abort, or it might
  980. * be a query that was initiated as part of transaction abort to get the remote
  981. * side back to the appropriate state.
  982. *
  983. * It's not a huge problem if we throw an ERROR here, but if we get into error
  984. * recursion trouble, we'll end up slamming the connection shut, which will
  985. * necessitate failing the entire toplevel transaction even if subtransactions
  986. * were used. Try to use WARNING where we can.
  987. *
  988. * endtime is the time at which we should give up and assume the remote
  989. * side is dead. Returns true if the timeout expired, otherwise false.
  990. * Sets *result except in case of a timeout.
  991. */
  992. static bool
  993. pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
  994. {
  995. volatile bool timed_out = false;
  996. PGresult *volatile last_res = NULL;
  997. /* In what follows, do not leak any PGresults on an error. */
  998. PG_TRY();
  999. {
  1000. for (;;)
  1001. {
  1002. PGresult *res;
  1003. while (PQisBusy(conn))
  1004. {
  1005. int wc;
  1006. TimestampTz now = GetCurrentTimestamp();
  1007. long secs;
  1008. int microsecs;
  1009. long cur_timeout;
  1010. /* If timeout has expired, give up, else get sleep time. */
  1011. if (now >= endtime)
  1012. {
  1013. timed_out = true;
  1014. goto exit;
  1015. }
  1016. TimestampDifference(now, endtime, &secs, &microsecs);
  1017. /* To protect against clock skew, limit sleep to one minute. */
  1018. cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
  1019. /* Sleep until there's something to do */
  1020. wc = WaitLatchOrSocket(MyLatch,
  1021. WL_LATCH_SET | WL_SOCKET_READABLE |
  1022. WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
  1023. PQsocket(conn),
  1024. cur_timeout, PG_WAIT_EXTENSION);
  1025. ResetLatch(MyLatch);
  1026. CHECK_FOR_INTERRUPTS();
  1027. /* Data available in socket? */
  1028. if (wc & WL_SOCKET_READABLE)
  1029. {
  1030. if (!PQconsumeInput(conn))
  1031. {
  1032. /* connection trouble; treat the same as a timeout */
  1033. timed_out = true;
  1034. goto exit;
  1035. }
  1036. }
  1037. }
  1038. res = PQgetResult(conn);
  1039. if (res == NULL)
  1040. break; /* query is complete */
  1041. PQclear(last_res);
  1042. last_res = res;
  1043. }
  1044. exit: ;
  1045. }
  1046. PG_CATCH();
  1047. {
  1048. PQclear(last_res);
  1049. PG_RE_THROW();
  1050. }
  1051. PG_END_TRY();
  1052. if (timed_out)
  1053. PQclear(last_res);
  1054. else
  1055. *result = last_res;
  1056. return timed_out;
  1057. }