diff --git a/Pipfile.lock b/Pipfile.lock index 19afd44b..7a26f213 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -50,37 +50,37 @@ }, "cython": { "hashes": [ - "sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da", - "sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac", - "sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7", - "sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8", - "sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c", - "sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112", - "sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f", - "sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f", - "sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d", - "sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c", - "sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b", - "sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53", - "sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679", - "sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7", - "sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa", - "sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21", - "sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770", - "sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97", - "sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c", - "sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62", - "sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89", - "sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947", - "sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081", - "sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f", - "sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6", - "sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b", - "sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6", - "sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4" + "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", + "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", + "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", + "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", + "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", + "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", + "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", + "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", + "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", + "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", + "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", + "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", + "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", + "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", + "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", + "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", + "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", + "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", + "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", + "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", + "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", + "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", + "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", + "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", + "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", + "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", + "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", + "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" ], "index": "pypi", - "version": "==0.29" + "version": "==0.29.1" }, "e1839a8": { "editable": true, @@ -112,24 +112,16 @@ }, "msgpack": { "hashes": [ - "sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc", - "sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b", - "sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3", - "sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0", - "sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee", - "sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42", - "sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c", - "sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f", - "sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172", - "sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4", - "sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4", - "sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf", - "sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00", - "sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0", - "sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e" + "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", + "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", + "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", + "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", + "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", + "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", + "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" ], "index": "pypi", - "version": "==0.5.6" + "version": "==0.6.0" }, "multio": { "hashes": [ @@ -204,17 +196,17 @@ }, "pdbpp": { "hashes": [ - "sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05" + "sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399" ], "index": "pypi", - "version": "==0.9.2" + "version": "==0.9.3" }, "pygments": { "hashes": [ - "sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d", - "sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc" + "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", + "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" ], - "version": "==2.2.0" + "version": "==2.3.0" }, "python-dateutil": { "hashes": [ @@ -246,14 +238,14 @@ }, "sortedcontainers": { "hashes": [ - "sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682", - "sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950" + "sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a", + "sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60" ], - "version": "==2.0.5" + "version": "==2.1.0" }, "tractor": { "git": "git://github.com/tgoodlet/tractor.git", - "ref": "71bb87aa3a249af37ec68d00b0a5853f58923f1e" + "ref": "c0cdb3945a9a9538b65bd76038f263e859fbbfe7" }, "trio": { "hashes": [ @@ -314,37 +306,37 @@ }, "cython": { "hashes": [ - "sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da", - "sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac", - "sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7", - "sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8", - "sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c", - "sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112", - "sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f", - "sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f", - "sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d", - "sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c", - "sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b", - "sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53", - "sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679", - "sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7", - "sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa", - "sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21", - "sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770", - "sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97", - "sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c", - "sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62", - "sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89", - "sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947", - "sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081", - "sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f", - "sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6", - "sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b", - "sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6", - "sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4" + "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", + "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", + "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", + "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", + "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", + "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", + "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", + "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", + "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", + "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", + "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", + "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", + "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", + "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", + "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", + "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", + "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", + "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", + "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", + "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", + "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", + "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", + "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", + "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", + "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", + "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", + "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", + "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" ], "index": "pypi", - "version": "==0.29" + "version": "==0.29.1" }, "fancycompleter": { "hashes": [ @@ -376,24 +368,16 @@ }, "msgpack": { "hashes": [ - "sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc", - "sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b", - "sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3", - "sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0", - "sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee", - "sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42", - "sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c", - "sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f", - "sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172", - "sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4", - "sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4", - "sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf", - "sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00", - "sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0", - "sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e" + "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", + "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", + "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", + "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", + "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", + "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", + "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" ], "index": "pypi", - "version": "==0.5.6" + "version": "==0.6.0" }, "multio": { "hashes": [ @@ -468,10 +452,10 @@ }, "pdbpp": { "hashes": [ - "sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05" + "sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399" ], "index": "pypi", - "version": "==0.9.2" + "version": "==0.9.3" }, "piker": { "editable": true, @@ -493,18 +477,18 @@ }, "pygments": { "hashes": [ - "sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d", - "sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc" + "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", + "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" ], - "version": "==2.2.0" + "version": "==2.3.0" }, "pytest": { "hashes": [ - "sha256:3f193df1cfe1d1609d4c583838bea3d532b18d6160fd3f55c9447fdca30848ec", - "sha256:e246cf173c01169b9617fc07264b7b1316e78d7a650055235d6d897bc80d9660" + "sha256:1d131cc532be0023ef8ae265e2a779938d0619bb6c2510f52987ffcba7fa1ee4", + "sha256:ca4761407f1acc85ffd1609f464ca20bb71a767803505bd4127d0e45c5a50e23" ], "index": "pypi", - "version": "==3.10.1" + "version": "==4.0.1" }, "python-dateutil": { "hashes": [ @@ -536,10 +520,10 @@ }, "sortedcontainers": { "hashes": [ - "sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682", - "sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950" + "sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a", + "sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60" ], - "version": "==2.0.5" + "version": "==2.1.0" }, "trio": { "hashes": [ diff --git a/piker/_async_utils.py b/piker/_async_utils.py new file mode 100644 index 00000000..7069d597 --- /dev/null +++ b/piker/_async_utils.py @@ -0,0 +1,33 @@ +""" +Async utils no one seems to have built into a core lib (yet). +""" +from collections import OrderedDict + + +def async_lifo_cache(maxsize=128): + """Async ``cache`` with a LIFO policy. + + Implemented my own since no one else seems to have + a standard. I'll wait for the smarter people to come + up with one, but until then... + """ + cache = OrderedDict() + + def decorator(fn): + + async def wrapper(*args): + key = args + try: + return cache[key] + except KeyError: + if len(cache) >= maxsize: + # discard last added new entry + cache.popitem() + + # do it + cache[key] = await fn(*args) + return cache[key] + + return wrapper + + return decorator diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 60f4292c..b493a9e7 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -6,6 +6,7 @@ from types import ModuleType from typing import List, Dict, Any, Optional from ..log import get_logger +from .data import DataFeed log = get_logger('broker.core') @@ -19,7 +20,7 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: meth = getattr(client.api, methname, None) if meth is None: log.warning( - "Couldn't find API method {methname} looking up on client") + f"Couldn't find API method {methname} looking up on client") meth = getattr(client, methname, None) if meth is None: @@ -45,14 +46,10 @@ async def stocks_quote( """Return quotes dict for ``tickers``. """ async with brokermod.get_client() as client: - results = await client.quote(tickers) - for key, val in results.items(): - if val is None: - brokermod.log.warn(f"Could not find symbol {key}?") - - return results + return await client.quote(tickers) +# TODO: these need tests async def option_chain( brokermod: ModuleType, symbol: str, @@ -67,7 +64,8 @@ async def option_chain( if date: id = int((await client.tickers2ids([symbol]))[symbol]) # build contracts dict for single expiry - return await client.option_chains({id: {date: None}}) + return await client.option_chains( + {(symbol, id, date): {}}) else: # get all contract expiries # (takes a long-ass time on QT fwiw) @@ -83,4 +81,5 @@ async def contracts( """Return option contracts (all expiries) for ``symbol``. """ async with brokermod.get_client() as client: + # return await client.get_all_contracts([symbol]) return await client.get_all_contracts([symbol]) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b1427cab..56e99fac 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -7,7 +7,10 @@ from itertools import cycle import socket import json from types import ModuleType -from typing import Coroutine, Callable, Dict, List +import typing +from typing import Coroutine, Callable, Dict, List, Any +import contextlib +from operator import itemgetter import trio import tractor @@ -56,12 +59,14 @@ async def stream_quotes( _cache = {} # ticker to quote caching while True: # use an event here to trigger exit? + prequote_start = time.time() - # tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await request_quotes() + postquote_start = time.time() + cancelled = cancel_scope.cancelled_caught if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") @@ -69,18 +74,20 @@ async def stream_quotes( # quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(request_quotes) - postquote_start = time.time() - new_quotes = {} + new_quotes = [] if diff_cached: - # if cache is enabled then only deliver "new" changes - for symbol, quote in quotes.items(): - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - new_quotes[symbol] = quote + # If cache is enabled then only deliver "new" changes. + # Useful for polling setups but obviously should be + # disabled if you're rx-ing event data. + for quote in quotes: + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + new_quotes.append(quote) else: new_quotes = quotes @@ -101,31 +108,51 @@ async def stream_quotes( await trio.sleep(delay) +class DataFeed(typing.NamedTuple): + """A per broker "data feed" container. + + A structure to keep track of components used by + real-time data daemons. + """ + mod: ModuleType + client: object + quoter_keys: List[str] = ['stock', 'option'] + tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( + quoter_keys, False) + quoters: Dict[str, typing.Coroutine] = {} + subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} + + async def fan_out_to_chans( - brokermod: ModuleType, + feed: DataFeed, get_quotes: Coroutine, - tickers2chans: Dict[str, tractor.Channel], + symbols2chans: Dict[str, tractor.Channel], rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") async def request(): """Get quotes for current symbol subscription set. """ - return await get_quotes(list(tickers2chans.keys())) + return await get_quotes(list(symbols2chans.keys())) - async for quotes in stream_quotes(brokermod, request, rate): + async for quotes in stream_quotes( + feed.mod, request, rate, + diff_cached=diff_cached, + ): chan_payloads = {} - for symbol, quote in quotes.items(): + for quote in quotes: + # is this too QT specific? + symbol = quote['symbol'] # set symbol quotes for each subscriber - for chan, cid in tickers2chans.get(symbol, set()): + for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( chan, {'yield': {}, 'cid': cid} @@ -142,41 +169,21 @@ async def fan_out_to_chans( ConnectionRefusedError, ): log.warn(f"{chan} went down?") - for chanset in tickers2chans.values(): + for chanset in symbols2chans.values(): chanset.discard((chan, cid)) - if not any(tickers2chans.values()): - log.warn(f"No subs left for broker {brokermod.name}, exiting task") + if not any(symbols2chans.values()): + log.warn(f"No subs left for broker {feed.mod.name}, exiting task") break - log.info(f"Terminating stream quoter task for {brokermod.name}") + log.info(f"Terminating stream quoter task for {feed.mod.name}") -async def get_cached_client(broker, tickers): - """Get or create the current actor's cached broker client. - """ - # check if a cached client is in the local actor's statespace - clients = tractor.current_actor().statespace.setdefault('clients', {}) - try: - return clients[broker] - except KeyError: - log.info(f"Creating new client for broker {broker}") - brokermod = get_brokermod(broker) - # TODO: move to AsyncExitStack in 3.7 - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - clients[broker] = ( - brokermod, client, client_cntxmng, get_quotes) - - return brokermod, client, client_cntxmng, get_quotes - - -async def symbol_data(broker, tickers): +async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ - _, client, _, get_quotes = await get_cached_client(broker, tickers) - return await client.symbol_data(tickers) + feed = await get_cached_feed(broker) + return await feed.client.symbol_data(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -192,8 +199,9 @@ async def smoke_quote(get_quotes, tickers, broker): # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for symbols {tickers}") quotes = await get_quotes(tickers) + # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(quotes) + invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) for symbol in invalid_tickers: tickers.remove(symbol) log.warn( @@ -202,7 +210,8 @@ async def smoke_quote(get_quotes, tickers, broker): # pop any tickers that return "empty" quotes payload = {} - for symbol, quote in quotes.items(): + for quote in quotes: + symbol = quote['symbol'] if quote is None: log.warn( f"Symbol `{symbol}` not found by broker" @@ -210,6 +219,12 @@ async def smoke_quote(get_quotes, tickers, broker): # XXX: not this mutates the input list (for now) tickers.remove(symbol) continue + + # report any unknown/invalid symbols (QT specific) + if quote.get('low52w', False) is None: + log.warn( + f"{symbol} seems to be defunct") + payload[symbol] = quote return payload @@ -218,23 +233,25 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, tickers, chan=None, cid=None): +async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None): """Absolute symbol subscription list for each quote stream. - Effectively a consumer subscription api. + Effectively a symbol subscription api. """ - log.info(f"{chan} changed symbol subscription to {tickers}") - ss = tractor.current_actor().statespace - broker2tickersubs = ss['broker2tickersubs'] - tickers2chans = broker2tickersubs.get(broker) + log.info(f"{chan} changed symbol subscription to {symbols}") + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan - for ticker in tickers: - tickers2chans.setdefault(ticker, set()).add((chan, cid)) + for ticker in symbols: + symbols2chans.setdefault(ticker, set()).add((chan, cid)) + # remove any existing symbol subscriptions if symbol is not + # found in ``symbols`` + # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in tickers, tickers2chans.copy() + lambda ticker: ticker not in symbols, symbols2chans.copy() ): - chanset = tickers2chans.get(ticker) + chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): if chan in item: @@ -242,12 +259,42 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): if not chanset: # pop empty sets which will trigger bg quoter task termination - tickers2chans.pop(ticker) + symbols2chans.pop(ticker) + + +async def get_cached_feed( + brokername: str, +) -> DataFeed: + """Get/create a ``DataFeed`` from/in the current actor. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + feeds = ss['feeds'] + lock = feeds['_lock'] + feed_stack = ss['feed_stacks'][brokername] + async with lock: + try: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + return feed + except KeyError: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + client = await feed_stack.enter_async_context( + brokermod.get_client()) + feed = DataFeed( + mod=brokermod, + client=client, + ) + feeds[brokername] = feed + return feed async def start_quote_stream( broker: str, - tickers: List[str], + symbols: List[Any], + feed_type: str = 'stock', + diff_cached: bool = True, chan: tractor.Channel = None, cid: str = None, ) -> None: @@ -263,57 +310,75 @@ async def start_quote_stream( get_console_log(actor.loglevel) # pull global vars from local actor ss = actor.statespace - broker2tickersubs = ss['broker2tickersubs'] - clients = ss['clients'] - dtasks = ss['dtasks'] - tickers = list(tickers) + # broker2symbolsubs = ss.setdefault('broker2symbolsubs', {}) + ss.setdefault('feeds', {'_lock': trio.Lock()}) + feed_stacks = ss.setdefault('feed_stacks', {}) + symbols = list(symbols) log.info( - f"{chan.uid} subscribed to {broker} for tickers {tickers}") + f"{chan.uid} subscribed to {broker} for symbols {symbols}") + feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack()) + # another actor task may have already created it + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] - brokermod, client, _, get_quotes = await get_cached_client(broker, tickers) - if broker not in broker2tickersubs: - tickers2chans = broker2tickersubs.setdefault(broker, {}) - else: - log.info(f"Subscribing with existing `{broker}` daemon") - tickers2chans = broker2tickersubs[broker] - - # do a smoke quote (note this mutates the input list and filters - # out bad symbols for now) - payload = await smoke_quote(get_quotes, tickers, broker) - # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + if feed_type == 'stock': + get_quotes = feed.quoters.setdefault( + 'stock', + await feed.mod.stock_quoter(feed.client, symbols) + ) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, symbols, broker) + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) + elif feed_type == 'option': + # FIXME: yeah we need maybe a more general way to specify + # the arg signature for the option feed beasides a symbol + # + expiry date. + get_quotes = feed.quoters.setdefault( + 'option', + await feed.mod.option_quoter(feed.client, symbols) + ) # update map from each symbol to requesting client's chan - modify_quote_stream(broker, tickers, chan=chan, cid=cid) + await modify_quote_stream(broker, feed_type, symbols, chan, cid) try: - if broker not in dtasks: - # no quoter task yet so start a daemon task - log.info(f"Spawning quoter task for {brokermod.name}") - async with trio.open_nursery() as nursery: - nursery.start_soon(partial( - fan_out_to_chans, brokermod, get_quotes, tickers2chans, - cid=cid) - ) - dtasks.add(broker) - + if not feed.tasks.get(feed_type): + # no data feeder task yet; so start one + respawn = True + log.info(f"Spawning data feed task for {feed.mod.name}") + while respawn: + respawn = False + try: + async with trio.open_nursery() as nursery: + nursery.start_soon( + partial( + fan_out_to_chans, feed, get_quotes, + symbols2chans, + diff_cached=diff_cached, + cid=cid + ) + ) + feed.tasks[feed_type] = True + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True # unblocks when no more symbols subscriptions exist and the # quote streamer task terminates (usually because another call # was made to `modify_quoter` to unsubscribe from streaming # symbols) - log.info(f"Terminated quoter task for {brokermod.name}") - - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know there's no error handling.. - await cntxmng.__aexit__(None, None, None) finally: + log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") + feed.tasks.pop(feed_type) # if there are truly no more subscriptions with this broker # drop from broker subs dict - if not any(tickers2chans.values()): + if not any(symbols2chans.values()): log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) + # broker2symbolsubs.pop(broker, None) + + # destroy the API client + await feed_stack.aclose() async def stream_to_file( diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 0f8738a5..330580af 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -5,7 +5,8 @@ import time from datetime import datetime from functools import partial import configparser -from typing import List, Tuple, Dict, Any +from operator import itemgetter +from typing import List, Tuple, Dict, Any, Iterator, NamedTuple import trio from async_generator import asynccontextmanager @@ -14,6 +15,7 @@ from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError from ..log import get_logger, colorize_json +from .._async_utils import async_lifo_cache # TODO: move to urllib3/requests once supported import asks @@ -23,13 +25,19 @@ log = get_logger(__name__) _refresh_token_ep = 'https://login.questrade.com/oauth2/' _version = 'v1' -_rate_limit = 3 # queries/sec +_rate_limit = 4 # queries/sec class QuestradeError(Exception): "Non-200 OK response code" +class ContractsKey(NamedTuple): + symbol: str + id: int + expiry: datetime + + class _API: """Questrade API endpoints exposed as methods and wrapped with an http session. @@ -60,7 +68,11 @@ class _API: 'symbols', params={'ids': ids, 'names': names}) async def quotes(self, ids: str) -> dict: - return await self._request('markets/quotes', params={'ids': ids}) + quotes = (await self._request( + 'markets/quotes', params={'ids': ids}))['quotes'] + for quote in quotes: + quote['key'] = quote['symbol'] + return quotes async def candles(self, id: str, start: str, end, interval) -> dict: return await self._request(f'markets/candles/{id}', params={}) @@ -78,20 +90,19 @@ class _API: async def option_quotes( self, - contracts: Dict[int, Dict[str, dict]], + contracts: Dict[ContractsKey, Dict[int, dict]], option_ids: List[int] = [], # if you don't want them all ) -> dict: - "Retrieve option chain quotes for all option ids or by filter(s)." + """Retrieve option chain quotes for all option ids or by filter(s). + """ filters = [ { "underlyingId": int(symbol_id), "expiryDate": str(expiry), } # every expiry per symbol id - for symbol_id, expiries in contracts.items() - for expiry in expiries + for (symbol, symbol_id, expiry), bystrike in contracts.items() ] - resp = await self._sess.post( path=f'/markets/quotes/options', json={'filters': filters, 'optionIds': option_ids} @@ -110,8 +121,9 @@ class Client: self.api = _API(self._sess) self._conf = config self.access_data = {} - self.user_data = {} self._reload_config(config) + self._symbol_cache: Dict[str, int] = {} + self._contracts2expiries = {} def _reload_config(self, config=None, **kwargs): log.warn("Reloading access config data") @@ -212,12 +224,25 @@ class Client: async def tickers2ids(self, tickers): """Helper routine that take a sequence of ticker symbols and returns - their corresponding QT symbol ids. + their corresponding QT numeric symbol ids. + + Cache any symbol to id lookups for later use. """ - data = await self.api.symbols(names=','.join(tickers)) + cache = self._symbol_cache symbols2ids = {} - for ticker, symbol in zip(tickers, data['symbols']): - symbols2ids[symbol['symbol']] = str(symbol['symbolId']) + for symbol in tickers: + id = cache.get(symbol) + if id is not None: + symbols2ids[symbol] = id + + # still missing uncached values - hit the server + to_lookup = list(set(tickers) - set(symbols2ids)) + if to_lookup: + data = await self.api.symbols(names=','.join(to_lookup)) + for ticker, symbol in zip(to_lookup, data['symbols']): + name = symbol['symbol'] + assert name == ticker + cache[name] = symbols2ids[name] = str(symbol['symbolId']) return symbols2ids @@ -236,22 +261,17 @@ class Client: """Return stock quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) - ids = ','.join(t2ids.values()) - results = (await self.api.quotes(ids=ids))['quotes'] - quotes = {quote['symbol']: quote for quote in results} - - # set None for all symbols not found - if len(t2ids) < len(tickers): - for ticker in tickers: - if ticker not in quotes: - quotes[ticker] = None + quotes = [] + if t2ids: + ids = ','.join(t2ids.values()) + quotes = (await self.api.quotes(ids=ids)) return quotes async def symbol2contracts( self, symbol: str - ) -> Tuple[int, Dict[datetime, dict]]: + ) -> Dict[Tuple[str, int, datetime], dict]: """Return option contract for the given symbol. The most useful part is the expiries which can be passed to the option @@ -259,15 +279,18 @@ class Client: """ id = int((await self.tickers2ids([symbol]))[symbol]) contracts = await self.api.option_contracts(id) - return id, { - # convert to native datetime objs for sorting - datetime.fromisoformat(item['expiryDate']): - item for item in contracts + return { + ContractsKey( + symbol=symbol, + id=id, + # convert to native datetime objs for sorting + expiry=datetime.fromisoformat(item['expiryDate'])): + item for item in contracts } async def get_all_contracts( self, - symbols: List[str], + symbols: Iterator[str], # {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}} ) -> Dict[int, Dict[str, Dict[int, Any]]]: """Look up all contracts for each symbol in ``symbols`` and return the @@ -278,21 +301,29 @@ class Client: per symbol) and thus the return values should be cached for use with ``option_chains()``. """ - by_id = {} + by_key = {} for symbol in symbols: - id, contracts = await self.symbol2contracts(symbol) - by_id[id] = { - dt.isoformat(timespec='microseconds'): { + contracts = await self.symbol2contracts(symbol) + # FIXME: chainPerRoot here is probably why in some UIs + # you see a second chain with a (1) suffixed; should + # probably handle this eventually. + for key, byroot in sorted( + # sort by datetime + contracts.items(), + key=lambda item: item[0].expiry + ): + by_key[ + ContractsKey( + key.symbol, + key.id, + # converting back - maybe just do this initially? + key.expiry.isoformat(timespec='microseconds'), + ) + ] = { item['strikePrice']: item for item in byroot['chainPerRoot'][0]['chainPerStrikePrice'] } - for dt, byroot in sorted( - # sort by datetime - contracts.items(), - key=lambda item: item[0] - ) - } - return by_id + return by_key async def option_chains( self, @@ -301,12 +332,14 @@ class Client: ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return option chain snap quote for each ticker in ``symbols``. """ - quotes = await self.api.option_quotes(contracts) - batch = {} - for quote in quotes: - batch.setdefault( - quote['underlying'], {} - )[quote['symbol']] = quote + batch = [] + for key, bystrike in contracts.items(): + quotes = await self.api.option_quotes({key: bystrike}) + for quote in quotes: + # index by .symbol, .expiry since that's what + # a subscriber (currently) sends initially + quote['key'] = (key[0], key[2]) + batch.extend(quotes) return batch @@ -376,76 +409,115 @@ async def get_client() -> Client: write_conf(client) -async def quoter(client: Client, tickers: List[str]): - """Stock Quoter context. +async def stock_quoter(client: Client, tickers: List[str]): + """Stock quoter context. Yeah so fun times..QT has this symbol to ``int`` id lookup system that you have to use to get any quotes. That means we try to be smart and maintain a cache of this map lazily as requests from in for new tickers/symbols. Most of the closure variables here are to deal with that. """ - t2ids = {} - ids = '' - - def filter_symbols(quotes_dict: dict): - nonlocal t2ids - for symbol, quote in quotes_dict.items(): - if quote['low52w'] is None: - log.warn( - f"{symbol} seems to be defunct discarding from tickers") - t2ids.pop(symbol) + @async_lifo_cache(maxsize=128) + async def get_symbol_id_seq(symbols: Tuple[str]): + """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` + return a symbol id sequence string ``'id_1,id_2, ... , id_n'``. + """ + return ','.join(map(str, (await client.tickers2ids(symbols)).values())) async def get_quote(tickers): """Query for quotes using cached symbol ids. """ if not tickers: + # don't hit the network return {} - nonlocal ids, t2ids - new, current = set(tickers), set(t2ids.keys()) - if new != current: - # update ticker ids cache - log.debug(f"Tickers set changed {new - current}") - t2ids = await client.tickers2ids(tickers) - # re-save symbol -> ids cache - ids = ','.join(map(str, t2ids.values())) + + ids = await get_symbol_id_seq(tuple(tickers)) try: quotes_resp = await client.api.quotes(ids=ids) except (QuestradeError, BrokerError) as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker actor may have - # renewed already.. - client._reload_config() - try: - quotes_resp = await client.api.quotes(ids=ids) - except BrokerError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this - # `client.ensure_access()` locally thus blocking until - # the user provides an API key on the "client side" - await client.ensure_access(force_refresh=True) - quotes_resp = await client.api.quotes(ids=ids) - else: + if "Access token is invalid" not in str(qterr.args[0]): raise + # out-of-process piker actor may have + # renewed already.. + client._reload_config() + try: + quotes_resp = await client.api.quotes(ids=ids) + except BrokerError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" + await client.ensure_access(force_refresh=True) + quotes_resp = await client.api.quotes(ids=ids) - # dict packing and post-processing - quotes = {} - for quote in quotes_resp['quotes']: - quotes[quote['symbol']] = quote - + # post-processing + for quote in quotes_resp: if quote.get('delay', 0) > 0: log.warn(f"Delayed quote:\n{quote}") - return quotes + return quotes_resp - # strip out unknown/invalid symbols - first_quotes_dict = await get_quote(tickers) - filter_symbols(first_quotes_dict) - # re-save symbol -> ids cache - ids = ','.join(map(str, t2ids.values())) + return get_quote + + +async def option_quoter(client: Client, tickers: List[str]): + """Option quoter context. + """ + # sanity + if isinstance(tickers[0], tuple): + datetime.fromisoformat(tickers[0][1]) + else: + log.warn(f"Ignoring option quoter call with {tickers}") + # TODO make caller always check that a quoter has been set + return + + @async_lifo_cache(maxsize=128) + async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]): + """For each tuple, + ``(symbol_date_1, symbol_date_2, ... , symbol_date_n)`` + return a contract dict. + """ + symbols = map(itemgetter(0), sym_date_pairs) + dates = map(itemgetter(1), sym_date_pairs) + contracts = await client.get_all_contracts(symbols) + selected = {} + for key, val in contracts.items(): + if key.expiry in dates: + selected[key] = val + + return selected + + async def get_quote(symbol_date_pairs): + """Query for quotes using cached symbol ids. + """ + contracts = await get_contract_by_date( + tuple(symbol_date_pairs)) + try: + quotes = await client.option_chains(contracts) + except (QuestradeError, BrokerError) as qterr: + if "Access token is invalid" not in str(qterr.args[0]): + raise + # out-of-process piker actor may have + # renewed already.. + client._reload_config() + try: + quotes = await client.option_chains(contracts) + except BrokerError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" + await client.ensure_access(force_refresh=True) + quotes = await client.option_chains(contracts) + + return quotes return get_quote diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 63cd8af7..ff1e22d4 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -2,6 +2,7 @@ Robinhood API backend. """ from functools import partial +from typing import List from async_generator import asynccontextmanager # TODO: move to urllib3/requests once supported @@ -44,7 +45,7 @@ class Client: self._sess.base_location = _service_ep self.api = _API(self._sess) - def _zip_in_order(self, symbols: [str], results_dict: dict): + def _zip_in_order(self, symbols: [str], quotes: List[dict]): return {quote.get('symbol', sym) if quote else sym: quote for sym, quote in zip(symbols, results_dict)} @@ -52,11 +53,16 @@ class Client: """Retrieve quotes for a list of ``symbols``. """ try: - resp = await self.api.quotes(','.join(symbols)) + quotes = (await self.api.quotes(','.join(symbols)))['results'] except BrokerError: - resp = {'results': [None] * len(symbols)} + quotes = [None] * len(symbols) - return self._zip_in_order(symbols, resp['results']) + for quote in quotes: + # insert our subscription key field + if quote is not None: + quote['key'] = quote['symbol'] + + return list(filter(bool, quotes)) async def symbol_data(self, symbols: [str]): """Retrieve symbol data via the ``fundamentals`` endpoint. diff --git a/piker/cli.py b/piker/cli.py index 3ea0eaac..6dd3b327 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -4,6 +4,8 @@ Console interface to broker client/daemons. from functools import partial import json import os +from operator import attrgetter +from operator import itemgetter import click import pandas as pd @@ -32,11 +34,6 @@ def pikerd(loglevel, host, tl): get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=['piker.brokers.data'], - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set(), - }, name='brokerd', loglevel=loglevel if tl else None, ) @@ -105,6 +102,12 @@ def quote(loglevel, broker, tickers, df_output): log.error(f"No quotes could be found for {tickers}?") return + if len(quotes) < len(tickers): + syms = tuple(map(itemgetter('symbol'), quotes)) + for ticker in tickers: + if ticker not in syms: + brokermod.log.warn(f"Could not find symbol {ticker}?") + if df_output: cols = next(filter(bool, quotes.values())).copy() cols.pop('symbol') @@ -130,11 +133,6 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): "No broker daemon could be found, spawning brokerd..") portal = await nursery.start_actor( 'brokerd', - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set(), - }, rpc_module_paths=['piker.brokers.data'], loglevel=loglevel, ) @@ -168,23 +166,10 @@ def monitor(loglevel, broker, rate, name, dhost, test, tl): async with maybe_spawn_brokerd_as_subactor( tries=tries, loglevel=loglevel ) as portal: - if test: - # stream from a local test file - agen = await portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - # agen = data.stream_from_file(test) - else: - # start live streaming from broker daemon - agen = await portal.run( - "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, tickers=tickers) - # run app "main" await _async_main( name, portal, tickers, - brokermod, rate, agen, + brokermod, rate, test=test, ) tractor.run( @@ -330,14 +315,16 @@ def dump(ctx, name): def contracts(loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) - quotes = trio.run(partial(core.contracts, brokermod, symbol)) + contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with # the option_chain_quote cmd - id, contracts = next(iter(quotes.items())) - quotes = list(contracts) + output = tuple(map(attrgetter('expiry'), contracts)) + else: + output = tuple(contracts.items()) - click.echo(colorize_json(quotes)) + # TODO: need a cli test to verify + click.echo(colorize_json(output)) @cli.command() @@ -358,17 +345,15 @@ def optsquote(loglevel, broker, symbol, df_output, date): partial( core.option_chain, brokermod, symbol, date ) - )[symbol] + ) if not quotes: - log.error(f"No quotes could be found for {symbol}?") + log.error(f"No option quotes could be found for {symbol}?") return if df_output: - cols = next(filter(bool, quotes.values())).copy() df = pd.DataFrame( - (quote.values() for contract, quote in quotes.items()), - index=quotes.keys(), - columns=cols.keys(), + (quote.values() for quote in quotes), + columns=quotes[0].keys(), ) click.echo(df) else: diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index 5f159ace..44480a61 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -19,6 +19,7 @@ from kivy.lang import Builder from kivy import utils from kivy.app import async_runTouchApp from kivy.core.window import Window +from async_generator import aclosing from ..log import get_logger from .pager import PagerView @@ -513,13 +514,24 @@ async def _async_main( tickers: List[str], brokermod: ModuleType, rate: int, - # an async generator instance which yields quotes dict packets - quote_gen: AsyncGeneratorType, + test: bool = False ) -> None: '''Launch kivy app + all other related tasks. This is started with cli cmd `piker monitor`. ''' + if test: + # stream from a local test file + quote_gen = await portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + # start live streaming from broker daemon + quote_gen = await portal.run( + "piker.brokers.data", 'start_quote_stream', + broker=brokermod.name, symbols=tickers) + # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await portal.run( @@ -594,14 +606,17 @@ async def _async_main( try: # Trio-kivy entry point. await async_runTouchApp(widgets['root']) # run kivy - await quote_gen.aclose() # cancel aysnc gen call finally: + await quote_gen.aclose() # cancel aysnc gen call # un-subscribe from symbols stream (cancel if brokerd # was already torn down - say by SIGINT) with trio.move_on_after(0.2): await portal.run( "piker.brokers.data", 'modify_quote_stream', - broker=brokermod.name, tickers=[]) + broker=brokermod.name, + feed_type='stock', + symbols=[] + ) # cancel GUI update task nursery.cancel_scope.cancel() diff --git a/tests/conftest.py b/tests/conftest.py index f7688a9d..76a8ddee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,20 @@ import pytest +import tractor +from piker import log + + +def pytest_addoption(parser): + parser.addoption("--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing") + + +@pytest.fixture(scope='session', autouse=True) +def loglevel(request): + orig = tractor.log._default_loglevel + level = tractor.log._default_loglevel = request.config.option.loglevel + log.get_console_log(level) + yield level + tractor.log._default_loglevel = orig @pytest.fixture diff --git a/tests/test_cli.py b/tests/test_cli.py index 2273410a..ec46c004 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -18,11 +18,11 @@ def run(cmd, *args): return cp -def verify_keys(tickers, quotes_dict): +def verify_keys(tickers, quotes): """Verify all ticker names are keys in ``quotes_dict``. """ - for key, quote in quotes_dict.items(): - assert key in tickers + for quote in quotes: + assert quote['key'] in tickers @pytest.fixture @@ -39,8 +39,8 @@ def test_known_quotes(capfd, nyse_tickers): # verify output can be parsed as json out, err = capfd.readouterr() - quotes_dict = json.loads(out) - verify_keys(nyse_tickers, quotes_dict) + quotes = json.loads(out) + verify_keys(nyse_tickers, quotes) @pytest.mark.parametrize( @@ -61,8 +61,8 @@ def test_quotes_ticker_not_found( out, err = capfd.readouterr() if out: # verify output can be parsed as json - quotes_dict = json.loads(out) - verify_keys(tickers, quotes_dict) + quotes = json.loads(out) + verify_keys(tickers, quotes) # check for warning log message when some quotes are found warnmsg = f'Could not find symbol {bad_ticker[0]}' assert warnmsg in err diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 897ba531..482c1e37 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -4,7 +4,9 @@ Questrade broker testing import time import trio +import tractor from trio.testing import trio_test +from tractor.testing import tractor_test from piker.brokers import questrade as qt import pytest @@ -18,72 +20,75 @@ def check_qt_conf_section(brokerconf): # stock quote -_ex_quote = { - "VWAP": 7.383792, - "askPrice": 7.56, - "askSize": 2, - "bidPrice": 6.1, - "bidSize": 2, - "delay": 0, - "high52w": 9.68, - "highPrice": 8, - "isHalted": 'false', - "lastTradePrice": 6.96, - "lastTradePriceTrHrs": 6.97, - "lastTradeSize": 2000, - "lastTradeTick": "Down", - "lastTradeTime": "2018-02-07T15:59:59.259000-05:00", - "low52w": 1.03, - "lowPrice": 6.88, - "openPrice": 7.64, - "symbol": "EMH.VN", - "symbolId": 10164524, - "tier": "", - "volume": 5357805 +_ex_quotes = { + 'stock': { + "VWAP": 7.383792, + "askPrice": 7.56, + "askSize": 2, + "bidPrice": 6.1, + "bidSize": 2, + "delay": 0, + "high52w": 9.68, + "highPrice": 8, + "key": "EMH.VN", + "isHalted": 'false', + "lastTradePrice": 6.96, + "lastTradePriceTrHrs": 6.97, + "lastTradeSize": 2000, + "lastTradeTick": "Down", + "lastTradeTime": "2018-02-07T15:59:59.259000-05:00", + "low52w": 1.03, + "lowPrice": 6.88, + "openPrice": 7.64, + "symbol": "EMH.VN", + "symbolId": 10164524, + "tier": "", + "volume": 5357805 + }, + 'option': { + 'VWAP': 0, + 'askPrice': None, + 'askSize': 0, + 'bidPrice': None, + 'bidSize': 0, + 'delay': 0, + 'delta': -0.212857, + 'gamma': 0.003524, + 'highPrice': 0, + 'isHalted': False, + "key": ["WEED.TO", '2018-10-23T00:00:00.000000-04:00'], + 'lastTradePrice': 22, + 'lastTradePriceTrHrs': None, + 'lastTradeSize': 0, + 'lastTradeTick': 'Equal', + 'lastTradeTime': '2018-10-23T00:00:00.000000-04:00', + 'lowPrice': 0, + 'openInterest': 1, + 'openPrice': 0, + 'rho': -0.891868, + 'symbol': 'WEED15Jan21P54.00.MX', + 'symbolId': 22739148, + 'theta': -0.012911, + 'underlying': 'WEED.TO', + 'underlyingId': 16529510, + 'vega': 0.220885, + 'volatility': 75.514171, + 'volume': 0 + } } -# option quote -_ex_contract = { - 'VWAP': 0, - 'askPrice': None, - 'askSize': 0, - 'bidPrice': None, - 'bidSize': 0, - 'delay': 0, - 'delta': -0.212857, - 'gamma': 0.003524, - 'highPrice': 0, - 'isHalted': False, - 'lastTradePrice': 22, - 'lastTradePriceTrHrs': None, - 'lastTradeSize': 0, - 'lastTradeTick': 'Equal', - 'lastTradeTime': '2018-10-23T00:00:00.000000-04:00', - 'lowPrice': 0, - 'openInterest': 1, - 'openPrice': 0, - 'rho': -0.891868, - 'symbol': 'WEED15Jan21P54.00.MX', - 'symbolId': 22739148, - 'theta': -0.012911, - 'underlying': 'WEED.TO', - 'underlyingId': 16529510, - 'vega': 0.220885, - 'volatility': 75.514171, - 'volume': 0 -} - - -def match_packet(symbols, quotes): +def match_packet(symbols, quotes, feed_type='stock'): """Verify target ``symbols`` match keys in ``quotes`` packet. """ assert len(quotes) == len(symbols) - for ticker in symbols: - quote = quotes.pop(ticker) + # for ticker in symbols: + for quote in quotes.copy(): + assert quote['key'] in symbols + quotes.remove(quote) # verify the quote packet format hasn't changed - for key in _ex_quote: + for key in _ex_quotes[feed_type]: quote.pop(key) # no additional fields either @@ -104,11 +109,11 @@ async def test_batched_stock_quote(us_symbols): @trio_test -async def test_quoter_context(us_symbols): +async def test_stock_quoter_context(us_symbols): """Test that a quoter "context" used by the data feed daemon. """ async with qt.get_client() as client: - quoter = await qt.quoter(client, us_symbols) + quoter = await qt.stock_quoter(client, us_symbols) quotes = await quoter(us_symbols) match_packet(us_symbols, quotes) @@ -119,12 +124,14 @@ async def test_option_contracts(tmx_symbols): """ async with qt.get_client() as client: for symbol in tmx_symbols: - id, contracts = await client.symbol2contracts(symbol) - assert isinstance(id, int) - assert isinstance(contracts, dict) - for dt in contracts: - assert dt.isoformat( - timespec='microseconds') == contracts[dt]['expiryDate'] + contracts = await client.symbol2contracts(symbol) + key, byroot = next(iter(contracts.items())) + assert isinstance(key.id, int) + assert isinstance(byroot, dict) + for key in contracts: + # check that datetime is same as reported in contract + assert key.expiry.isoformat( + timespec='microseconds') == contracts[key]['expiryDate'] @trio_test @@ -133,17 +140,15 @@ async def test_option_chain(tmx_symbols): """ async with qt.get_client() as client: # contract lookup - should be cached - contracts = await client.get_all_contracts(tmx_symbols) + contracts = await client.get_all_contracts([tmx_symbols[0]]) # chains quote for all symbols quotes = await client.option_chains(contracts) - for key in tmx_symbols: - contracts = quotes.pop(key) - for key, quote in contracts.items(): - for key in _ex_contract: - quote.pop(key) - assert not quote - # chains for each symbol were retreived - assert not quotes + # verify contents match what we expect + for quote in quotes: + assert quote['underlying'] in tmx_symbols + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote @trio_test @@ -163,7 +168,7 @@ async def test_option_quote_latency(tmx_symbols): # NOTE: request latency is usually 2x faster that these (5, contracts), (0.5, single) ]: - for _ in range(10): + for _ in range(3): # chains quote for all symbols start = time.time() await client.option_chains(contract) @@ -171,3 +176,117 @@ async def test_option_quote_latency(tmx_symbols): print(f"Request took {took}") assert took <= expected_latency await trio.sleep(0.1) + + +async def stream_option_chain(portal, symbols): + """Start up an option quote stream. + + ``symbols`` arg is ignored here. + """ + symbol = 'APHA.TO' # your fave greenhouse LP + async with qt.get_client() as client: + contracts = await client.get_all_contracts([symbol]) + + contractkey = next(iter(contracts)) + subs_keys = list( + map(lambda item: (item.symbol, item.expiry), contracts)) + sub = subs_keys[0] + + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + diff_cached=False, + ) + try: + # wait on the data streamer to actually start + # delivering + await agen.__anext__() + + # it'd sure be nice to have an asyncitertools here... + with trio.fail_after(2.1): + loops = 8 + count = 0 + async for quotes in agen: + # print(f'got quotes for {quotes.keys()}') + # we should receive all calls and puts + assert len(quotes) == len(contracts[contractkey]) * 2 + for symbol, quote in quotes.items(): + assert quote['key'] == sub + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote + count += 1 + if count == loops: + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='option', + symbols=[], + ) + + +async def stream_stocks(portal, symbols): + """Start up a stock quote stream. + """ + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=symbols, + ) + try: + # it'd sure be nice to have an asyncitertools here... + async for quotes in agen: + assert quotes + for key in quotes: + assert key in symbols + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='stock', + symbols=[], + ) + + +@pytest.mark.parametrize( + 'stream_what', + [ + (stream_stocks,), + (stream_option_chain,), + (stream_stocks, stream_option_chain), + ], + ids=['stocks', 'options', 'stocks_and_options'], +) +@tractor_test +async def test_quote_streaming(tmx_symbols, loglevel, stream_what): + """Set up option streaming using the broker daemon. + """ + async with tractor.find_actor('brokerd') as portal: + async with tractor.open_nursery() as nursery: + # only one per host address, spawns an actor if None + if not portal: + # no brokerd actor found + portal = await nursery.start_actor( + 'data_feed', + rpc_module_paths=[ + 'piker.brokers.data', + 'piker.brokers.core' + ], + ) + async with trio.open_nursery() as n: + for func in stream_what: + n.start_soon(func, portal, tmx_symbols) + + # stop all spawned subactors + await nursery.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py deleted file mode 100644 index 7e63e3a7..00000000 --- a/tests/test_tractor.py +++ /dev/null @@ -1,61 +0,0 @@ -""" -Actor model API testing -""" -import pytest -import tractor - - -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. - """ - async with tractor.find_actor('brokerd') as portals: - if not portals: - # only one per host address, spawns an actor if None - async with tractor.open_nursery() as nursery: - # no brokerd actor found - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.data'], - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set() - }, - ) - - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.data', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols - - gen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='robinhood', - tickers=us_symbols, - ) - # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols - break - # terminate far-end async-gen - # await gen.asend(None) - # break - - # stop all spawned subactors - await nursery.cancel() - - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) - - -def test_rx_price_quotes_from_brokerd(us_symbols): - tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', - )