Merge pull request #60 from pikers/options_streaming

Options streaming
kivy_mainline_and_py3.8
goodboy 2018-12-01 18:35:28 -05:00 committed by GitHub
commit 0fbab8b831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 729 additions and 496 deletions

204
Pipfile.lock generated
View File

@ -50,37 +50,37 @@
},
"cython": {
"hashes": [
"sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da",
"sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac",
"sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7",
"sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8",
"sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c",
"sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112",
"sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f",
"sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f",
"sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d",
"sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c",
"sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b",
"sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53",
"sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679",
"sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7",
"sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa",
"sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21",
"sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770",
"sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97",
"sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c",
"sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62",
"sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89",
"sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947",
"sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081",
"sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f",
"sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6",
"sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b",
"sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6",
"sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4"
"sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9",
"sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79",
"sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97",
"sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9",
"sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586",
"sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea",
"sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1",
"sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b",
"sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c",
"sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782",
"sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499",
"sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3",
"sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d",
"sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027",
"sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a",
"sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879",
"sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466",
"sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c",
"sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555",
"sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6",
"sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb",
"sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175",
"sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1",
"sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f",
"sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6",
"sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc",
"sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742",
"sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537"
],
"index": "pypi",
"version": "==0.29"
"version": "==0.29.1"
},
"e1839a8": {
"editable": true,
@ -112,24 +112,16 @@
},
"msgpack": {
"hashes": [
"sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc",
"sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b",
"sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3",
"sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0",
"sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee",
"sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42",
"sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c",
"sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f",
"sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172",
"sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4",
"sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4",
"sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf",
"sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00",
"sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0",
"sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e"
"sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88",
"sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259",
"sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36",
"sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba",
"sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865",
"sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f",
"sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1"
],
"index": "pypi",
"version": "==0.5.6"
"version": "==0.6.0"
},
"multio": {
"hashes": [
@ -204,17 +196,17 @@
},
"pdbpp": {
"hashes": [
"sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05"
"sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399"
],
"index": "pypi",
"version": "==0.9.2"
"version": "==0.9.3"
},
"pygments": {
"hashes": [
"sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d",
"sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc"
"sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7",
"sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc"
],
"version": "==2.2.0"
"version": "==2.3.0"
},
"python-dateutil": {
"hashes": [
@ -246,14 +238,14 @@
},
"sortedcontainers": {
"hashes": [
"sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682",
"sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950"
"sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a",
"sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60"
],
"version": "==2.0.5"
"version": "==2.1.0"
},
"tractor": {
"git": "git://github.com/tgoodlet/tractor.git",
"ref": "71bb87aa3a249af37ec68d00b0a5853f58923f1e"
"ref": "c0cdb3945a9a9538b65bd76038f263e859fbbfe7"
},
"trio": {
"hashes": [
@ -314,37 +306,37 @@
},
"cython": {
"hashes": [
"sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da",
"sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac",
"sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7",
"sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8",
"sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c",
"sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112",
"sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f",
"sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f",
"sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d",
"sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c",
"sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b",
"sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53",
"sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679",
"sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7",
"sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa",
"sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21",
"sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770",
"sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97",
"sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c",
"sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62",
"sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89",
"sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947",
"sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081",
"sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f",
"sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6",
"sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b",
"sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6",
"sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4"
"sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9",
"sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79",
"sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97",
"sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9",
"sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586",
"sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea",
"sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1",
"sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b",
"sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c",
"sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782",
"sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499",
"sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3",
"sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d",
"sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027",
"sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a",
"sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879",
"sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466",
"sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c",
"sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555",
"sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6",
"sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb",
"sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175",
"sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1",
"sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f",
"sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6",
"sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc",
"sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742",
"sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537"
],
"index": "pypi",
"version": "==0.29"
"version": "==0.29.1"
},
"fancycompleter": {
"hashes": [
@ -376,24 +368,16 @@
},
"msgpack": {
"hashes": [
"sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc",
"sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b",
"sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3",
"sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0",
"sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee",
"sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42",
"sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c",
"sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f",
"sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172",
"sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4",
"sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4",
"sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf",
"sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00",
"sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0",
"sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e"
"sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88",
"sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259",
"sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36",
"sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba",
"sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865",
"sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f",
"sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1"
],
"index": "pypi",
"version": "==0.5.6"
"version": "==0.6.0"
},
"multio": {
"hashes": [
@ -468,10 +452,10 @@
},
"pdbpp": {
"hashes": [
"sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05"
"sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399"
],
"index": "pypi",
"version": "==0.9.2"
"version": "==0.9.3"
},
"piker": {
"editable": true,
@ -493,18 +477,18 @@
},
"pygments": {
"hashes": [
"sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d",
"sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc"
"sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7",
"sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc"
],
"version": "==2.2.0"
"version": "==2.3.0"
},
"pytest": {
"hashes": [
"sha256:3f193df1cfe1d1609d4c583838bea3d532b18d6160fd3f55c9447fdca30848ec",
"sha256:e246cf173c01169b9617fc07264b7b1316e78d7a650055235d6d897bc80d9660"
"sha256:1d131cc532be0023ef8ae265e2a779938d0619bb6c2510f52987ffcba7fa1ee4",
"sha256:ca4761407f1acc85ffd1609f464ca20bb71a767803505bd4127d0e45c5a50e23"
],
"index": "pypi",
"version": "==3.10.1"
"version": "==4.0.1"
},
"python-dateutil": {
"hashes": [
@ -536,10 +520,10 @@
},
"sortedcontainers": {
"hashes": [
"sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682",
"sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950"
"sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a",
"sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60"
],
"version": "==2.0.5"
"version": "==2.1.0"
},
"trio": {
"hashes": [

View File

@ -0,0 +1,33 @@
"""
Async utils no one seems to have built into a core lib (yet).
"""
from collections import OrderedDict
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator

View File

@ -6,6 +6,7 @@ from types import ModuleType
from typing import List, Dict, Any, Optional
from ..log import get_logger
from .data import DataFeed
log = get_logger('broker.core')
@ -19,7 +20,7 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict:
meth = getattr(client.api, methname, None)
if meth is None:
log.warning(
"Couldn't find API method {methname} looking up on client")
f"Couldn't find API method {methname} looking up on client")
meth = getattr(client, methname, None)
if meth is None:
@ -45,14 +46,10 @@ async def stocks_quote(
"""Return quotes dict for ``tickers``.
"""
async with brokermod.get_client() as client:
results = await client.quote(tickers)
for key, val in results.items():
if val is None:
brokermod.log.warn(f"Could not find symbol {key}?")
return results
return await client.quote(tickers)
# TODO: these need tests
async def option_chain(
brokermod: ModuleType,
symbol: str,
@ -67,7 +64,8 @@ async def option_chain(
if date:
id = int((await client.tickers2ids([symbol]))[symbol])
# build contracts dict for single expiry
return await client.option_chains({id: {date: None}})
return await client.option_chains(
{(symbol, id, date): {}})
else:
# get all contract expiries
# (takes a long-ass time on QT fwiw)
@ -83,4 +81,5 @@ async def contracts(
"""Return option contracts (all expiries) for ``symbol``.
"""
async with brokermod.get_client() as client:
# return await client.get_all_contracts([symbol])
return await client.get_all_contracts([symbol])

View File

@ -7,7 +7,10 @@ from itertools import cycle
import socket
import json
from types import ModuleType
from typing import Coroutine, Callable, Dict, List
import typing
from typing import Coroutine, Callable, Dict, List, Any
import contextlib
from operator import itemgetter
import trio
import tractor
@ -56,12 +59,14 @@ async def stream_quotes(
_cache = {} # ticker to quote caching
while True: # use an event here to trigger exit?
prequote_start = time.time()
# tickers = list(tickers2chans.keys())
with trio.move_on_after(3) as cancel_scope:
quotes = await request_quotes()
postquote_start = time.time()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
@ -69,18 +74,20 @@ async def stream_quotes(
# quotes = await wait_for_network(partial(get_quotes, tickers))
quotes = await wait_for_network(request_quotes)
postquote_start = time.time()
new_quotes = {}
new_quotes = []
if diff_cached:
# if cache is enabled then only deliver "new" changes
for symbol, quote in quotes.items():
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
new_quotes[symbol] = quote
# If cache is enabled then only deliver "new" changes.
# Useful for polling setups but obviously should be
# disabled if you're rx-ing event data.
for quote in quotes:
symbol = quote['symbol']
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
new_quotes.append(quote)
else:
new_quotes = quotes
@ -101,31 +108,51 @@ async def stream_quotes(
await trio.sleep(delay)
class DataFeed(typing.NamedTuple):
"""A per broker "data feed" container.
A structure to keep track of components used by
real-time data daemons.
"""
mod: ModuleType
client: object
quoter_keys: List[str] = ['stock', 'option']
tasks: Dict[str, trio._core._run.Task] = dict.fromkeys(
quoter_keys, False)
quoters: Dict[str, typing.Coroutine] = {}
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}}
async def fan_out_to_chans(
brokermod: ModuleType,
feed: DataFeed,
get_quotes: Coroutine,
tickers2chans: Dict[str, tractor.Channel],
symbols2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
"""
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
async def request():
"""Get quotes for current symbol subscription set.
"""
return await get_quotes(list(tickers2chans.keys()))
return await get_quotes(list(symbols2chans.keys()))
async for quotes in stream_quotes(brokermod, request, rate):
async for quotes in stream_quotes(
feed.mod, request, rate,
diff_cached=diff_cached,
):
chan_payloads = {}
for symbol, quote in quotes.items():
for quote in quotes:
# is this too QT specific?
symbol = quote['symbol']
# set symbol quotes for each subscriber
for chan, cid in tickers2chans.get(symbol, set()):
for chan, cid in symbols2chans.get(quote['key'], set()):
chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
@ -142,41 +169,21 @@ async def fan_out_to_chans(
ConnectionRefusedError,
):
log.warn(f"{chan} went down?")
for chanset in tickers2chans.values():
for chanset in symbols2chans.values():
chanset.discard((chan, cid))
if not any(tickers2chans.values()):
log.warn(f"No subs left for broker {brokermod.name}, exiting task")
if not any(symbols2chans.values()):
log.warn(f"No subs left for broker {feed.mod.name}, exiting task")
break
log.info(f"Terminating stream quoter task for {brokermod.name}")
log.info(f"Terminating stream quoter task for {feed.mod.name}")
async def get_cached_client(broker, tickers):
"""Get or create the current actor's cached broker client.
"""
# check if a cached client is in the local actor's statespace
clients = tractor.current_actor().statespace.setdefault('clients', {})
try:
return clients[broker]
except KeyError:
log.info(f"Creating new client for broker {broker}")
brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = (
brokermod, client, client_cntxmng, get_quotes)
return brokermod, client, client_cntxmng, get_quotes
async def symbol_data(broker, tickers):
async def symbol_data(broker: str, tickers: List[str]):
"""Retrieve baseline symbol info from broker.
"""
_, client, _, get_quotes = await get_cached_client(broker, tickers)
return await client.symbol_data(tickers)
feed = await get_cached_feed(broker)
return await feed.client.symbol_data(tickers)
async def smoke_quote(get_quotes, tickers, broker):
@ -192,8 +199,9 @@ async def smoke_quote(get_quotes, tickers, broker):
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for symbols {tickers}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes)
invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes))
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
@ -202,7 +210,8 @@ async def smoke_quote(get_quotes, tickers, broker):
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
for quote in quotes:
symbol = quote['symbol']
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
@ -210,6 +219,12 @@ async def smoke_quote(get_quotes, tickers, broker):
# XXX: not this mutates the input list (for now)
tickers.remove(symbol)
continue
# report any unknown/invalid symbols (QT specific)
if quote.get('low52w', False) is None:
log.warn(
f"{symbol} seems to be defunct")
payload[symbol] = quote
return payload
@ -218,23 +233,25 @@ async def smoke_quote(get_quotes, tickers, broker):
###########################################
def modify_quote_stream(broker, tickers, chan=None, cid=None):
async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None):
"""Absolute symbol subscription list for each quote stream.
Effectively a consumer subscription api.
Effectively a symbol subscription api.
"""
log.info(f"{chan} changed symbol subscription to {tickers}")
ss = tractor.current_actor().statespace
broker2tickersubs = ss['broker2tickersubs']
tickers2chans = broker2tickersubs.get(broker)
log.info(f"{chan} changed symbol subscription to {symbols}")
feed = await get_cached_feed(broker)
symbols2chans = feed.subscriptions[feed_type]
# update map from each symbol to requesting client's chan
for ticker in tickers:
tickers2chans.setdefault(ticker, set()).add((chan, cid))
for ticker in symbols:
symbols2chans.setdefault(ticker, set()).add((chan, cid))
# remove any existing symbol subscriptions if symbol is not
# found in ``symbols``
# TODO: this can likely be factored out into the pub-sub api
for ticker in filter(
lambda ticker: ticker not in tickers, tickers2chans.copy()
lambda ticker: ticker not in symbols, symbols2chans.copy()
):
chanset = tickers2chans.get(ticker)
chanset = symbols2chans.get(ticker)
# XXX: cid will be different on unsub call
for item in chanset.copy():
if chan in item:
@ -242,12 +259,42 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
if not chanset:
# pop empty sets which will trigger bg quoter task termination
tickers2chans.pop(ticker)
symbols2chans.pop(ticker)
async def get_cached_feed(
brokername: str,
) -> DataFeed:
"""Get/create a ``DataFeed`` from/in the current actor.
"""
# check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace
feeds = ss['feeds']
lock = feeds['_lock']
feed_stack = ss['feed_stacks'][brokername]
async with lock:
try:
feed = feeds[brokername]
log.info(f"Subscribing with existing `{brokername}` daemon")
return feed
except KeyError:
log.info(f"Creating new client for broker {brokername}")
brokermod = get_brokermod(brokername)
client = await feed_stack.enter_async_context(
brokermod.get_client())
feed = DataFeed(
mod=brokermod,
client=client,
)
feeds[brokername] = feed
return feed
async def start_quote_stream(
broker: str,
tickers: List[str],
symbols: List[Any],
feed_type: str = 'stock',
diff_cached: bool = True,
chan: tractor.Channel = None,
cid: str = None,
) -> None:
@ -263,57 +310,75 @@ async def start_quote_stream(
get_console_log(actor.loglevel)
# pull global vars from local actor
ss = actor.statespace
broker2tickersubs = ss['broker2tickersubs']
clients = ss['clients']
dtasks = ss['dtasks']
tickers = list(tickers)
# broker2symbolsubs = ss.setdefault('broker2symbolsubs', {})
ss.setdefault('feeds', {'_lock': trio.Lock()})
feed_stacks = ss.setdefault('feed_stacks', {})
symbols = list(symbols)
log.info(
f"{chan.uid} subscribed to {broker} for tickers {tickers}")
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack())
# another actor task may have already created it
feed = await get_cached_feed(broker)
symbols2chans = feed.subscriptions[feed_type]
brokermod, client, _, get_quotes = await get_cached_client(broker, tickers)
if broker not in broker2tickersubs:
tickers2chans = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker]
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, tickers, broker)
# push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid})
if feed_type == 'stock':
get_quotes = feed.quoters.setdefault(
'stock',
await feed.mod.stock_quoter(feed.client, symbols)
)
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker)
# push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid})
elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify
# the arg signature for the option feed beasides a symbol
# + expiry date.
get_quotes = feed.quoters.setdefault(
'option',
await feed.mod.option_quoter(feed.client, symbols)
)
# update map from each symbol to requesting client's chan
modify_quote_stream(broker, tickers, chan=chan, cid=cid)
await modify_quote_stream(broker, feed_type, symbols, chan, cid)
try:
if broker not in dtasks:
# no quoter task yet so start a daemon task
log.info(f"Spawning quoter task for {brokermod.name}")
async with trio.open_nursery() as nursery:
nursery.start_soon(partial(
fan_out_to_chans, brokermod, get_quotes, tickers2chans,
cid=cid)
)
dtasks.add(broker)
if not feed.tasks.get(feed_type):
# no data feeder task yet; so start one
respawn = True
log.info(f"Spawning data feed task for {feed.mod.name}")
while respawn:
respawn = False
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(
partial(
fan_out_to_chans, feed, get_quotes,
symbols2chans,
diff_cached=diff_cached,
cid=cid
)
)
feed.tasks[feed_type] = True
except trio.BrokenResourceError:
log.exception("Respawning failed data feed task")
respawn = True
# unblocks when no more symbols subscriptions exist and the
# quote streamer task terminates (usually because another call
# was made to `modify_quoter` to unsubscribe from streaming
# symbols)
log.info(f"Terminated quoter task for {brokermod.name}")
# TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know there's no error handling..
await cntxmng.__aexit__(None, None, None)
finally:
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
feed.tasks.pop(feed_type)
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2chans.values()):
if not any(symbols2chans.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
# broker2symbolsubs.pop(broker, None)
# destroy the API client
await feed_stack.aclose()
async def stream_to_file(

View File

@ -5,7 +5,8 @@ import time
from datetime import datetime
from functools import partial
import configparser
from typing import List, Tuple, Dict, Any
from operator import itemgetter
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
import trio
from async_generator import asynccontextmanager
@ -14,6 +15,7 @@ from ..calc import humanize, percent_change
from . import config
from ._util import resproc, BrokerError
from ..log import get_logger, colorize_json
from .._async_utils import async_lifo_cache
# TODO: move to urllib3/requests once supported
import asks
@ -23,13 +25,19 @@ log = get_logger(__name__)
_refresh_token_ep = 'https://login.questrade.com/oauth2/'
_version = 'v1'
_rate_limit = 3 # queries/sec
_rate_limit = 4 # queries/sec
class QuestradeError(Exception):
"Non-200 OK response code"
class ContractsKey(NamedTuple):
symbol: str
id: int
expiry: datetime
class _API:
"""Questrade API endpoints exposed as methods and wrapped with an
http session.
@ -60,7 +68,11 @@ class _API:
'symbols', params={'ids': ids, 'names': names})
async def quotes(self, ids: str) -> dict:
return await self._request('markets/quotes', params={'ids': ids})
quotes = (await self._request(
'markets/quotes', params={'ids': ids}))['quotes']
for quote in quotes:
quote['key'] = quote['symbol']
return quotes
async def candles(self, id: str, start: str, end, interval) -> dict:
return await self._request(f'markets/candles/{id}', params={})
@ -78,20 +90,19 @@ class _API:
async def option_quotes(
self,
contracts: Dict[int, Dict[str, dict]],
contracts: Dict[ContractsKey, Dict[int, dict]],
option_ids: List[int] = [], # if you don't want them all
) -> dict:
"Retrieve option chain quotes for all option ids or by filter(s)."
"""Retrieve option chain quotes for all option ids or by filter(s).
"""
filters = [
{
"underlyingId": int(symbol_id),
"expiryDate": str(expiry),
}
# every expiry per symbol id
for symbol_id, expiries in contracts.items()
for expiry in expiries
for (symbol, symbol_id, expiry), bystrike in contracts.items()
]
resp = await self._sess.post(
path=f'/markets/quotes/options',
json={'filters': filters, 'optionIds': option_ids}
@ -110,8 +121,9 @@ class Client:
self.api = _API(self._sess)
self._conf = config
self.access_data = {}
self.user_data = {}
self._reload_config(config)
self._symbol_cache: Dict[str, int] = {}
self._contracts2expiries = {}
def _reload_config(self, config=None, **kwargs):
log.warn("Reloading access config data")
@ -212,12 +224,25 @@ class Client:
async def tickers2ids(self, tickers):
"""Helper routine that take a sequence of ticker symbols and returns
their corresponding QT symbol ids.
their corresponding QT numeric symbol ids.
Cache any symbol to id lookups for later use.
"""
data = await self.api.symbols(names=','.join(tickers))
cache = self._symbol_cache
symbols2ids = {}
for ticker, symbol in zip(tickers, data['symbols']):
symbols2ids[symbol['symbol']] = str(symbol['symbolId'])
for symbol in tickers:
id = cache.get(symbol)
if id is not None:
symbols2ids[symbol] = id
# still missing uncached values - hit the server
to_lookup = list(set(tickers) - set(symbols2ids))
if to_lookup:
data = await self.api.symbols(names=','.join(to_lookup))
for ticker, symbol in zip(to_lookup, data['symbols']):
name = symbol['symbol']
assert name == ticker
cache[name] = symbols2ids[name] = str(symbol['symbolId'])
return symbols2ids
@ -236,22 +261,17 @@ class Client:
"""Return stock quotes for each ticker in ``tickers``.
"""
t2ids = await self.tickers2ids(tickers)
ids = ','.join(t2ids.values())
results = (await self.api.quotes(ids=ids))['quotes']
quotes = {quote['symbol']: quote for quote in results}
# set None for all symbols not found
if len(t2ids) < len(tickers):
for ticker in tickers:
if ticker not in quotes:
quotes[ticker] = None
quotes = []
if t2ids:
ids = ','.join(t2ids.values())
quotes = (await self.api.quotes(ids=ids))
return quotes
async def symbol2contracts(
self,
symbol: str
) -> Tuple[int, Dict[datetime, dict]]:
) -> Dict[Tuple[str, int, datetime], dict]:
"""Return option contract for the given symbol.
The most useful part is the expiries which can be passed to the option
@ -259,15 +279,18 @@ class Client:
"""
id = int((await self.tickers2ids([symbol]))[symbol])
contracts = await self.api.option_contracts(id)
return id, {
# convert to native datetime objs for sorting
datetime.fromisoformat(item['expiryDate']):
item for item in contracts
return {
ContractsKey(
symbol=symbol,
id=id,
# convert to native datetime objs for sorting
expiry=datetime.fromisoformat(item['expiryDate'])):
item for item in contracts
}
async def get_all_contracts(
self,
symbols: List[str],
symbols: Iterator[str],
# {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}}
) -> Dict[int, Dict[str, Dict[int, Any]]]:
"""Look up all contracts for each symbol in ``symbols`` and return the
@ -278,21 +301,29 @@ class Client:
per symbol) and thus the return values should be cached for use with
``option_chains()``.
"""
by_id = {}
by_key = {}
for symbol in symbols:
id, contracts = await self.symbol2contracts(symbol)
by_id[id] = {
dt.isoformat(timespec='microseconds'): {
contracts = await self.symbol2contracts(symbol)
# FIXME: chainPerRoot here is probably why in some UIs
# you see a second chain with a (1) suffixed; should
# probably handle this eventually.
for key, byroot in sorted(
# sort by datetime
contracts.items(),
key=lambda item: item[0].expiry
):
by_key[
ContractsKey(
key.symbol,
key.id,
# converting back - maybe just do this initially?
key.expiry.isoformat(timespec='microseconds'),
)
] = {
item['strikePrice']: item for item in
byroot['chainPerRoot'][0]['chainPerStrikePrice']
}
for dt, byroot in sorted(
# sort by datetime
contracts.items(),
key=lambda item: item[0]
)
}
return by_id
return by_key
async def option_chains(
self,
@ -301,12 +332,14 @@ class Client:
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option chain snap quote for each ticker in ``symbols``.
"""
quotes = await self.api.option_quotes(contracts)
batch = {}
for quote in quotes:
batch.setdefault(
quote['underlying'], {}
)[quote['symbol']] = quote
batch = []
for key, bystrike in contracts.items():
quotes = await self.api.option_quotes({key: bystrike})
for quote in quotes:
# index by .symbol, .expiry since that's what
# a subscriber (currently) sends initially
quote['key'] = (key[0], key[2])
batch.extend(quotes)
return batch
@ -376,76 +409,115 @@ async def get_client() -> Client:
write_conf(client)
async def quoter(client: Client, tickers: List[str]):
"""Stock Quoter context.
async def stock_quoter(client: Client, tickers: List[str]):
"""Stock quoter context.
Yeah so fun times..QT has this symbol to ``int`` id lookup system that you
have to use to get any quotes. That means we try to be smart and maintain
a cache of this map lazily as requests from in for new tickers/symbols.
Most of the closure variables here are to deal with that.
"""
t2ids = {}
ids = ''
def filter_symbols(quotes_dict: dict):
nonlocal t2ids
for symbol, quote in quotes_dict.items():
if quote['low52w'] is None:
log.warn(
f"{symbol} seems to be defunct discarding from tickers")
t2ids.pop(symbol)
@async_lifo_cache(maxsize=128)
async def get_symbol_id_seq(symbols: Tuple[str]):
"""For each tuple ``(symbol_1, symbol_2, ... , symbol_n)``
return a symbol id sequence string ``'id_1,id_2, ... , id_n'``.
"""
return ','.join(map(str, (await client.tickers2ids(symbols)).values()))
async def get_quote(tickers):
"""Query for quotes using cached symbol ids.
"""
if not tickers:
# don't hit the network
return {}
nonlocal ids, t2ids
new, current = set(tickers), set(t2ids.keys())
if new != current:
# update ticker ids cache
log.debug(f"Tickers set changed {new - current}")
t2ids = await client.tickers2ids(tickers)
# re-save symbol -> ids cache
ids = ','.join(map(str, t2ids.values()))
ids = await get_symbol_id_seq(tuple(tickers))
try:
quotes_resp = await client.api.quotes(ids=ids)
except (QuestradeError, BrokerError) as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes_resp = await client.api.quotes(ids=ids)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes_resp = await client.api.quotes(ids=ids)
else:
if "Access token is invalid" not in str(qterr.args[0]):
raise
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes_resp = await client.api.quotes(ids=ids)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes_resp = await client.api.quotes(ids=ids)
# dict packing and post-processing
quotes = {}
for quote in quotes_resp['quotes']:
quotes[quote['symbol']] = quote
# post-processing
for quote in quotes_resp:
if quote.get('delay', 0) > 0:
log.warn(f"Delayed quote:\n{quote}")
return quotes
return quotes_resp
# strip out unknown/invalid symbols
first_quotes_dict = await get_quote(tickers)
filter_symbols(first_quotes_dict)
# re-save symbol -> ids cache
ids = ','.join(map(str, t2ids.values()))
return get_quote
async def option_quoter(client: Client, tickers: List[str]):
"""Option quoter context.
"""
# sanity
if isinstance(tickers[0], tuple):
datetime.fromisoformat(tickers[0][1])
else:
log.warn(f"Ignoring option quoter call with {tickers}")
# TODO make caller always check that a quoter has been set
return
@async_lifo_cache(maxsize=128)
async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]):
"""For each tuple,
``(symbol_date_1, symbol_date_2, ... , symbol_date_n)``
return a contract dict.
"""
symbols = map(itemgetter(0), sym_date_pairs)
dates = map(itemgetter(1), sym_date_pairs)
contracts = await client.get_all_contracts(symbols)
selected = {}
for key, val in contracts.items():
if key.expiry in dates:
selected[key] = val
return selected
async def get_quote(symbol_date_pairs):
"""Query for quotes using cached symbol ids.
"""
contracts = await get_contract_by_date(
tuple(symbol_date_pairs))
try:
quotes = await client.option_chains(contracts)
except (QuestradeError, BrokerError) as qterr:
if "Access token is invalid" not in str(qterr.args[0]):
raise
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes = await client.option_chains(contracts)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes = await client.option_chains(contracts)
return quotes
return get_quote

View File

@ -2,6 +2,7 @@
Robinhood API backend.
"""
from functools import partial
from typing import List
from async_generator import asynccontextmanager
# TODO: move to urllib3/requests once supported
@ -44,7 +45,7 @@ class Client:
self._sess.base_location = _service_ep
self.api = _API(self._sess)
def _zip_in_order(self, symbols: [str], results_dict: dict):
def _zip_in_order(self, symbols: [str], quotes: List[dict]):
return {quote.get('symbol', sym) if quote else sym: quote
for sym, quote in zip(symbols, results_dict)}
@ -52,11 +53,16 @@ class Client:
"""Retrieve quotes for a list of ``symbols``.
"""
try:
resp = await self.api.quotes(','.join(symbols))
quotes = (await self.api.quotes(','.join(symbols)))['results']
except BrokerError:
resp = {'results': [None] * len(symbols)}
quotes = [None] * len(symbols)
return self._zip_in_order(symbols, resp['results'])
for quote in quotes:
# insert our subscription key field
if quote is not None:
quote['key'] = quote['symbol']
return list(filter(bool, quotes))
async def symbol_data(self, symbols: [str]):
"""Retrieve symbol data via the ``fundamentals`` endpoint.

View File

@ -4,6 +4,8 @@ Console interface to broker client/daemons.
from functools import partial
import json
import os
from operator import attrgetter
from operator import itemgetter
import click
import pandas as pd
@ -32,11 +34,6 @@ def pikerd(loglevel, host, tl):
get_console_log(loglevel)
tractor.run_daemon(
rpc_module_paths=['piker.brokers.data'],
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
name='brokerd',
loglevel=loglevel if tl else None,
)
@ -105,6 +102,12 @@ def quote(loglevel, broker, tickers, df_output):
log.error(f"No quotes could be found for {tickers}?")
return
if len(quotes) < len(tickers):
syms = tuple(map(itemgetter('symbol'), quotes))
for ticker in tickers:
if ticker not in syms:
brokermod.log.warn(f"Could not find symbol {ticker}?")
if df_output:
cols = next(filter(bool, quotes.values())).copy()
cols.pop('symbol')
@ -130,11 +133,6 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
"No broker daemon could be found, spawning brokerd..")
portal = await nursery.start_actor(
'brokerd',
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
rpc_module_paths=['piker.brokers.data'],
loglevel=loglevel,
)
@ -168,23 +166,10 @@ def monitor(loglevel, broker, rate, name, dhost, test, tl):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
) as portal:
if test:
# stream from a local test file
agen = await portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
# agen = data.stream_from_file(test)
else:
# start live streaming from broker daemon
agen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, tickers=tickers)
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, agen,
brokermod, rate, test=test,
)
tractor.run(
@ -330,14 +315,16 @@ def dump(ctx, name):
def contracts(loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker)
get_console_log(loglevel)
quotes = trio.run(partial(core.contracts, brokermod, symbol))
contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
# just print out expiry dates which can be used with
# the option_chain_quote cmd
id, contracts = next(iter(quotes.items()))
quotes = list(contracts)
output = tuple(map(attrgetter('expiry'), contracts))
else:
output = tuple(contracts.items())
click.echo(colorize_json(quotes))
# TODO: need a cli test to verify
click.echo(colorize_json(output))
@cli.command()
@ -358,17 +345,15 @@ def optsquote(loglevel, broker, symbol, df_output, date):
partial(
core.option_chain, brokermod, symbol, date
)
)[symbol]
)
if not quotes:
log.error(f"No quotes could be found for {symbol}?")
log.error(f"No option quotes could be found for {symbol}?")
return
if df_output:
cols = next(filter(bool, quotes.values())).copy()
df = pd.DataFrame(
(quote.values() for contract, quote in quotes.items()),
index=quotes.keys(),
columns=cols.keys(),
(quote.values() for quote in quotes),
columns=quotes[0].keys(),
)
click.echo(df)
else:

View File

@ -19,6 +19,7 @@ from kivy.lang import Builder
from kivy import utils
from kivy.app import async_runTouchApp
from kivy.core.window import Window
from async_generator import aclosing
from ..log import get_logger
from .pager import PagerView
@ -513,13 +514,24 @@ async def _async_main(
tickers: List[str],
brokermod: ModuleType,
rate: int,
# an async generator instance which yields quotes dict packets
quote_gen: AsyncGeneratorType,
test: bool = False
) -> None:
'''Launch kivy app + all other related tasks.
This is started with cli cmd `piker monitor`.
'''
if test:
# stream from a local test file
quote_gen = await portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
# start live streaming from broker daemon
quote_gen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, symbols=tickers)
# subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded)
sd = await portal.run(
@ -594,14 +606,17 @@ async def _async_main(
try:
# Trio-kivy entry point.
await async_runTouchApp(widgets['root']) # run kivy
await quote_gen.aclose() # cancel aysnc gen call
finally:
await quote_gen.aclose() # cancel aysnc gen call
# un-subscribe from symbols stream (cancel if brokerd
# was already torn down - say by SIGINT)
with trio.move_on_after(0.2):
await portal.run(
"piker.brokers.data", 'modify_quote_stream',
broker=brokermod.name, tickers=[])
broker=brokermod.name,
feed_type='stock',
symbols=[]
)
# cancel GUI update task
nursery.cancel_scope.cancel()

View File

@ -1,4 +1,20 @@
import pytest
import tractor
from piker import log
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture

View File

@ -18,11 +18,11 @@ def run(cmd, *args):
return cp
def verify_keys(tickers, quotes_dict):
def verify_keys(tickers, quotes):
"""Verify all ticker names are keys in ``quotes_dict``.
"""
for key, quote in quotes_dict.items():
assert key in tickers
for quote in quotes:
assert quote['key'] in tickers
@pytest.fixture
@ -39,8 +39,8 @@ def test_known_quotes(capfd, nyse_tickers):
# verify output can be parsed as json
out, err = capfd.readouterr()
quotes_dict = json.loads(out)
verify_keys(nyse_tickers, quotes_dict)
quotes = json.loads(out)
verify_keys(nyse_tickers, quotes)
@pytest.mark.parametrize(
@ -61,8 +61,8 @@ def test_quotes_ticker_not_found(
out, err = capfd.readouterr()
if out:
# verify output can be parsed as json
quotes_dict = json.loads(out)
verify_keys(tickers, quotes_dict)
quotes = json.loads(out)
verify_keys(tickers, quotes)
# check for warning log message when some quotes are found
warnmsg = f'Could not find symbol {bad_ticker[0]}'
assert warnmsg in err

View File

@ -4,7 +4,9 @@ Questrade broker testing
import time
import trio
import tractor
from trio.testing import trio_test
from tractor.testing import tractor_test
from piker.brokers import questrade as qt
import pytest
@ -18,72 +20,75 @@ def check_qt_conf_section(brokerconf):
# stock quote
_ex_quote = {
"VWAP": 7.383792,
"askPrice": 7.56,
"askSize": 2,
"bidPrice": 6.1,
"bidSize": 2,
"delay": 0,
"high52w": 9.68,
"highPrice": 8,
"isHalted": 'false',
"lastTradePrice": 6.96,
"lastTradePriceTrHrs": 6.97,
"lastTradeSize": 2000,
"lastTradeTick": "Down",
"lastTradeTime": "2018-02-07T15:59:59.259000-05:00",
"low52w": 1.03,
"lowPrice": 6.88,
"openPrice": 7.64,
"symbol": "EMH.VN",
"symbolId": 10164524,
"tier": "",
"volume": 5357805
_ex_quotes = {
'stock': {
"VWAP": 7.383792,
"askPrice": 7.56,
"askSize": 2,
"bidPrice": 6.1,
"bidSize": 2,
"delay": 0,
"high52w": 9.68,
"highPrice": 8,
"key": "EMH.VN",
"isHalted": 'false',
"lastTradePrice": 6.96,
"lastTradePriceTrHrs": 6.97,
"lastTradeSize": 2000,
"lastTradeTick": "Down",
"lastTradeTime": "2018-02-07T15:59:59.259000-05:00",
"low52w": 1.03,
"lowPrice": 6.88,
"openPrice": 7.64,
"symbol": "EMH.VN",
"symbolId": 10164524,
"tier": "",
"volume": 5357805
},
'option': {
'VWAP': 0,
'askPrice': None,
'askSize': 0,
'bidPrice': None,
'bidSize': 0,
'delay': 0,
'delta': -0.212857,
'gamma': 0.003524,
'highPrice': 0,
'isHalted': False,
"key": ["WEED.TO", '2018-10-23T00:00:00.000000-04:00'],
'lastTradePrice': 22,
'lastTradePriceTrHrs': None,
'lastTradeSize': 0,
'lastTradeTick': 'Equal',
'lastTradeTime': '2018-10-23T00:00:00.000000-04:00',
'lowPrice': 0,
'openInterest': 1,
'openPrice': 0,
'rho': -0.891868,
'symbol': 'WEED15Jan21P54.00.MX',
'symbolId': 22739148,
'theta': -0.012911,
'underlying': 'WEED.TO',
'underlyingId': 16529510,
'vega': 0.220885,
'volatility': 75.514171,
'volume': 0
}
}
# option quote
_ex_contract = {
'VWAP': 0,
'askPrice': None,
'askSize': 0,
'bidPrice': None,
'bidSize': 0,
'delay': 0,
'delta': -0.212857,
'gamma': 0.003524,
'highPrice': 0,
'isHalted': False,
'lastTradePrice': 22,
'lastTradePriceTrHrs': None,
'lastTradeSize': 0,
'lastTradeTick': 'Equal',
'lastTradeTime': '2018-10-23T00:00:00.000000-04:00',
'lowPrice': 0,
'openInterest': 1,
'openPrice': 0,
'rho': -0.891868,
'symbol': 'WEED15Jan21P54.00.MX',
'symbolId': 22739148,
'theta': -0.012911,
'underlying': 'WEED.TO',
'underlyingId': 16529510,
'vega': 0.220885,
'volatility': 75.514171,
'volume': 0
}
def match_packet(symbols, quotes):
def match_packet(symbols, quotes, feed_type='stock'):
"""Verify target ``symbols`` match keys in ``quotes`` packet.
"""
assert len(quotes) == len(symbols)
for ticker in symbols:
quote = quotes.pop(ticker)
# for ticker in symbols:
for quote in quotes.copy():
assert quote['key'] in symbols
quotes.remove(quote)
# verify the quote packet format hasn't changed
for key in _ex_quote:
for key in _ex_quotes[feed_type]:
quote.pop(key)
# no additional fields either
@ -104,11 +109,11 @@ async def test_batched_stock_quote(us_symbols):
@trio_test
async def test_quoter_context(us_symbols):
async def test_stock_quoter_context(us_symbols):
"""Test that a quoter "context" used by the data feed daemon.
"""
async with qt.get_client() as client:
quoter = await qt.quoter(client, us_symbols)
quoter = await qt.stock_quoter(client, us_symbols)
quotes = await quoter(us_symbols)
match_packet(us_symbols, quotes)
@ -119,12 +124,14 @@ async def test_option_contracts(tmx_symbols):
"""
async with qt.get_client() as client:
for symbol in tmx_symbols:
id, contracts = await client.symbol2contracts(symbol)
assert isinstance(id, int)
assert isinstance(contracts, dict)
for dt in contracts:
assert dt.isoformat(
timespec='microseconds') == contracts[dt]['expiryDate']
contracts = await client.symbol2contracts(symbol)
key, byroot = next(iter(contracts.items()))
assert isinstance(key.id, int)
assert isinstance(byroot, dict)
for key in contracts:
# check that datetime is same as reported in contract
assert key.expiry.isoformat(
timespec='microseconds') == contracts[key]['expiryDate']
@trio_test
@ -133,17 +140,15 @@ async def test_option_chain(tmx_symbols):
"""
async with qt.get_client() as client:
# contract lookup - should be cached
contracts = await client.get_all_contracts(tmx_symbols)
contracts = await client.get_all_contracts([tmx_symbols[0]])
# chains quote for all symbols
quotes = await client.option_chains(contracts)
for key in tmx_symbols:
contracts = quotes.pop(key)
for key, quote in contracts.items():
for key in _ex_contract:
quote.pop(key)
assert not quote
# chains for each symbol were retreived
assert not quotes
# verify contents match what we expect
for quote in quotes:
assert quote['underlying'] in tmx_symbols
for key in _ex_quotes['option']:
quote.pop(key)
assert not quote
@trio_test
@ -163,7 +168,7 @@ async def test_option_quote_latency(tmx_symbols):
# NOTE: request latency is usually 2x faster that these
(5, contracts), (0.5, single)
]:
for _ in range(10):
for _ in range(3):
# chains quote for all symbols
start = time.time()
await client.option_chains(contract)
@ -171,3 +176,117 @@ async def test_option_quote_latency(tmx_symbols):
print(f"Request took {took}")
assert took <= expected_latency
await trio.sleep(0.1)
async def stream_option_chain(portal, symbols):
"""Start up an option quote stream.
``symbols`` arg is ignored here.
"""
symbol = 'APHA.TO' # your fave greenhouse LP
async with qt.get_client() as client:
contracts = await client.get_all_contracts([symbol])
contractkey = next(iter(contracts))
subs_keys = list(
map(lambda item: (item.symbol, item.expiry), contracts))
sub = subs_keys[0]
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
diff_cached=False,
)
try:
# wait on the data streamer to actually start
# delivering
await agen.__anext__()
# it'd sure be nice to have an asyncitertools here...
with trio.fail_after(2.1):
loops = 8
count = 0
async for quotes in agen:
# print(f'got quotes for {quotes.keys()}')
# we should receive all calls and puts
assert len(quotes) == len(contracts[contractkey]) * 2
for symbol, quote in quotes.items():
assert quote['key'] == sub
for key in _ex_quotes['option']:
quote.pop(key)
assert not quote
count += 1
if count == loops:
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='option',
symbols=[],
)
async def stream_stocks(portal, symbols):
"""Start up a stock quote stream.
"""
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=symbols,
)
try:
# it'd sure be nice to have an asyncitertools here...
async for quotes in agen:
assert quotes
for key in quotes:
assert key in symbols
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='stock',
symbols=[],
)
@pytest.mark.parametrize(
'stream_what',
[
(stream_stocks,),
(stream_option_chain,),
(stream_stocks, stream_option_chain),
],
ids=['stocks', 'options', 'stocks_and_options'],
)
@tractor_test
async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
"""Set up option streaming using the broker daemon.
"""
async with tractor.find_actor('brokerd') as portal:
async with tractor.open_nursery() as nursery:
# only one per host address, spawns an actor if None
if not portal:
# no brokerd actor found
portal = await nursery.start_actor(
'data_feed',
rpc_module_paths=[
'piker.brokers.data',
'piker.brokers.core'
],
)
async with trio.open_nursery() as n:
for func in stream_what:
n.start_soon(func, portal, tmx_symbols)
# stop all spawned subactors
await nursery.cancel()

View File

@ -1,61 +0,0 @@
"""
Actor model API testing
"""
import pytest
import tractor
async def rx_price_quotes_from_brokerd(us_symbols):
"""Verify we can spawn a daemon actor and retrieve streamed price data.
"""
async with tractor.find_actor('brokerd') as portals:
if not portals:
# only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery:
# no brokerd actor found
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=['piker.brokers.data'],
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set()
},
)
# gotta expose in a broker agnostic way...
# retrieve initial symbol data
# sd = await portal.run(
# 'piker.brokers.data', 'symbol_data', symbols=us_symbols)
# assert list(sd.keys()) == us_symbols
gen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='robinhood',
tickers=us_symbols,
)
# it'd sure be nice to have an asyncitertools here...
async for quotes in gen:
assert quotes
for key in quotes:
assert key in us_symbols
break
# terminate far-end async-gen
# await gen.asend(None)
# break
# stop all spawned subactors
await nursery.cancel()
# arbitter is cancelled here due to `find_actors()` internals
# (which internally uses `get_arbiter` which kills its channel
# server scope on exit)
def test_rx_price_quotes_from_brokerd(us_symbols):
tractor.run(
rx_price_quotes_from_brokerd,
us_symbols,
name='arbiter',
)