Merge pull request #50 from pikers/tractor_draft

Introducing "tractor"
kivy_mainline_and_py3.8
goodboy 2018-07-05 15:52:53 -04:00 committed by GitHub
commit 6ff871ff7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1769 additions and 568 deletions

427
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "e3eb385c3342f53cdb1793dc2055ec8f42669a15854b358632a5ceaa9dc1135b" "sha256": "8d6521c10626550c727da281b89d0b6df5d31eb7dc5fe6e7aadf3de58bae44a7"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -18,9 +18,9 @@
"default": { "default": {
"asks": { "asks": {
"hashes": [ "hashes": [
"sha256:d76a6314ecd7d2f920d2e94b8d7bcbb7a0941aa4c915874869c503c757088df2" "sha256:c3fc1115dfeb414ef0863da6f60f02aea7487f92f76b645738774bf93e8577de"
], ],
"version": "==1.3.11" "version": "==2.0.0"
}, },
"async-generator": { "async-generator": {
"hashes": [ "hashes": [
@ -31,10 +31,10 @@
}, },
"attrs": { "attrs": {
"hashes": [ "hashes": [
"sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9", "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265",
"sha256:a17a9573a6f475c99b551c0e0a812707ddda1ec9653bed04c13841404ed6f450" "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b"
], ],
"version": "==17.4.0" "version": "==18.1.0"
}, },
"click": { "click": {
"hashes": [ "hashes": [
@ -52,46 +52,47 @@
}, },
"contextvars": { "contextvars": {
"hashes": [ "hashes": [
"sha256:e9f9c5763d5a2afa6e420218e53954bd3829fc2d8acd806b98139d28c362cdf9" "sha256:7d73f8b1426cf0200fbe16900fcd73c9be29c54546b48a3980727e670b1acb10"
], ],
"markers": "python_version < '3.7'", "markers": "python_version < '3.7'",
"version": "==2.1" "version": "==2.2"
}, },
"cython": { "cython": {
"hashes": [ "hashes": [
"sha256:03db8c1b8120039f72493b95494a595be13b01b6860cfc93e2a651a001847b3b", "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5",
"sha256:0d2ccb812d73e67557fd16e7aa7bc5bac18933c1dfe306133cd0680ccab89f33", "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff",
"sha256:24f8ea864de733f5a447896cbeec2cac212247e33272539670b9f466f43f23db", "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d",
"sha256:30a8fd029eb932a7b5a74e158316d1d069ccb67a8607aa7b6c4ed19fab7fbd4a", "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355",
"sha256:37e680901e6a4b97ab67717f9b43fc58542cd10a77431efd2d8801d21d5a37d4", "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526",
"sha256:4984e097bc9da37862d97c1f66dacf2c80fadaea488d96ba0b5ea9d84dbc7521", "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c",
"sha256:4cfda677227af41e4502e088ee9875e71922238a207d0c40785a0fb09c703c21", "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f",
"sha256:4ec60a4086a175a81b9258f810440a6dd2671aa4b419d8248546d85a7de6a93f", "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729",
"sha256:51c7d48ea4cba532d11a6d128ebbc15373013f816e5d1c3a3946650b582a30b8", "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88",
"sha256:634e2f10fc8d026c633cffacb45cd8f4582149fa68e1428124e762dbc566e68a", "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224",
"sha256:67e0359709c8addc3ecb19e1dec6d84d67647e3906da618b953001f6d4480275", "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325",
"sha256:6a93d4ba0461edc7a359241f4ebbaa8f9bc9490b3540a8dd0460bef8c2c706db", "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe",
"sha256:6ba89d56c3ee45716378cda4f0490c3abe1edf79dce8b997f31608b14748a52b", "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15",
"sha256:6ca5436d470584ba6fd399a802c9d0bcf76cf1edb0123725a4de2f0048f9fa07", "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4",
"sha256:7656895cdd59d56dd4ed326d1ee9ede727020d4a5d8778a05af2d8e25af4b13d", "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9",
"sha256:85f7432776870d65639fed00f951a3c05ef1e534bc72a73cd1200d79b9a7d7d0", "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62",
"sha256:96dd674e72281d3feed74fd5adcf0514ba02884f123cdf4fb78567e7be6b1694", "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437",
"sha256:97bf06a89bcf9e8d7633cde89274d42b3b661dc974b58fca066fad762e46b4d8", "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142",
"sha256:9a465e7296a4629139be5d2015577f2ae5e08196eb7dc4c407beea130f362dc3", "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17",
"sha256:9a60355edca1cc9006be086e2633e190542aad2bf9e46948792a48b3ae28ed97", "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214",
"sha256:9eab3696f2cb88167db109d737c787fb9dd34ca414bd1e0c424e307956e02c94", "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea",
"sha256:c3ae7d40ebceb0d944dfeeceaf1fbf17e528f5327d97b008a8623ddddd1ecee3", "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2",
"sha256:c623d19fcc60ea27882f20cf484218926ddf6f978b958dae1070600a1974f809", "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87",
"sha256:c719a6e86d7c737afcc9729994f76b284d1c512099ee803eff11c2a9e6e33a42", "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f",
"sha256:cf17af0433218a1e33dc6f3069dd9e7cd0c80fe505972c3acd548e25f67973fd", "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff",
"sha256:daf96e0d232605e979995795f62ffd24c5c6ecea4526e4cbb86d80f01da954b2", "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304",
"sha256:db40de7d03842d3c4625028a74189ade52b27f8efaeb0d2ca06474f57e0813b2", "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3",
"sha256:deea1ef59445568dd7738fa3913aea7747e4927ff4ae3c10737844b8a5dd3e22", "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64",
"sha256:e05d28b5ce1ee5939d83e50344980659688ecaed65c5e10214d817ecf5d1fe6a", "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed",
"sha256:f5f6694ce668eb7a9b59550bfe4265258809c9b0665c206b26d697df2eef2a8b" "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e",
"sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9"
], ],
"index": "pypi", "index": "pypi",
"version": "==0.28.2" "version": "==0.28.3"
}, },
"e1839a8": { "e1839a8": {
"editable": true, "editable": true,
@ -112,14 +113,30 @@
}, },
"idna": { "idna": {
"hashes": [ "hashes": [
"sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e",
"sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16"
], ],
"version": "==2.6" "version": "==2.7"
},
"immutables": {
"hashes": [
"sha256:1614177d08408176a7b12313d6efefad41c3ba17e56cbeb404bb74f83cfdeaac",
"sha256:18cd84d5ff10ffd42db163d4aad88128d86c2939335b068852f8f399594521ee",
"sha256:4d29f0e6a880a20f94c9efd4672212c3276fa303621a4bdf2a5e4ba7f27cba84",
"sha256:5180319a7aebc9319e63e7ea663830d46566b84bed91cbcdb50e247d65fbda07",
"sha256:57d17742fccec4c0466ab658053fab1a327df42241e7b1cab28e5b85e7066ac1",
"sha256:75e2e4e9441938690821adc1ab00829a174f696aef1c52ae6aba767219d1e84f",
"sha256:812fd1124fca2f56a3a7f376f34ac44719f131d82b6001323aef84faa8be7f00",
"sha256:8a3d2fc0d9db57188f28d3e6cefe45e9905292965895a7a391be6ededa626811",
"sha256:95da22e2d16b47dbd3464ae447b94f8d74866bce045be216dd1691f2f1838a94",
"sha256:9c8e0267a4d35032ccc670e5ab52aba2693ded330c4c1f2a4870b4a7c86b65af",
"sha256:da1c24d6ab6b38604444b4b767b3bf6074dfa3f7d15d8192a3a4935f61f00bd6"
],
"version": "==0.5"
}, },
"kivy": { "kivy": {
"git": "git://github.com/matham/kivy.git", "git": "git://github.com/matham/kivy.git",
"ref": "async-loop" "ref": "4f62d0d8754ae2b590b44573200ea07b983f0f7b"
}, },
"msgpack": { "msgpack": {
"hashes": [ "hashes": [
@ -144,59 +161,57 @@
}, },
"multio": { "multio": {
"hashes": [ "hashes": [
"sha256:53fd38f5d90a5f1a5d2db507b73c474ef851f5465fab27ffabe401591808258a", "sha256:dcaee4d5d77cde8caf7902c8621aaa192febb384c7b1291fd47cfa41ac0eaebc"
"sha256:a6219395a1f84605c9041f0a7e8a529b989557c8a95920ddcd29fbed1d721758",
"sha256:f61bc6cf0ee8ea0ba32d5b9ae5ae1cadaebc39b6635a9b3d54142ded78164fe3"
], ],
"version": "==0.2.1" "version": "==0.2.3"
}, },
"numpy": { "numpy": {
"hashes": [ "hashes": [
"sha256:0739146eaf4985962f07c62f7133aca89f3a600faac891ce6c7f3a1e2afe5272", "sha256:07379fe0b450f6fd6e5934a9bc015025bb4ce1c8fbed3ca8bef29328b1bc9570",
"sha256:07e21f14490324cc1160db101e9b6c1233c33985af4cb1d301dd02650fea1d7f", "sha256:085afac75bbc97a096744fcfc97a4b321c5a87220286811e85089ae04885acdd",
"sha256:0f6a5ed0cd7ab1da11f5c07a8ecada73fc55a70ef7bb6311a4109891341d7277", "sha256:2d6481c6bdab1c75affc0fc71eb1bd4b3ecef620d06f2f60c3f00521d54be04f",
"sha256:0fd65cbbfdbf76bbf80c445d923b3accefea0fe2c2082049e0ce947c81fe1d3f", "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160",
"sha256:20cac3123d791e4bf8482a580d98d6b5969ba348b9d5364df791ba3a666b660d", "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8",
"sha256:528ce59ded2008f9e8543e0146acb3a98a9890da00adf8904b1e18c82099418b", "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6",
"sha256:56e392b7c738bd70e6f46cf48c8194d3d1dd4c5a59fae4b30c58bb6ef86e5233", "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5",
"sha256:675e0f23967ce71067d12b6944add505d5f0a251f819cfb44bdf8ee7072c090d", "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da",
"sha256:6be6b0ca705321c178c9858e5ad5611af664bbdfae1df1541f938a840a103888", "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388",
"sha256:719d914f564f35cce4dc103808f8297c807c9f0297ac183ed81ae8b5650e698e", "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2",
"sha256:768e777cc1ffdbf97c507f65975c8686ebafe0f3dc8925d02ac117acc4669ce9", "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3",
"sha256:7f76d406c6b998d6410198dcb82688dcdaec7d846aa87e263ccf52efdcfeba30", "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50",
"sha256:8c18ee4dddd5c6a811930c0a7c7947bf16387da3b394725f6063f1366311187d", "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9",
"sha256:99051e03b445117b26028623f1a487112ddf61a09a27e2d25e6bc07d37d94f25", "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b",
"sha256:a1413d06abfa942ca0553bf3bccaff5fdb36d55b84f2248e36228db871147dab", "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b",
"sha256:a7157c9ac6bddd2908c35ef099e4b643bc0e0ebb4d653deb54891d29258dd329", "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e",
"sha256:a958bf9d4834c72dee4f91a0476e7837b8a2966dc6fcfc42c421405f98d0da51", "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e",
"sha256:bb370120de6d26004358611441e07acda26840e41dfedc259d7f8cc613f96495", "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996",
"sha256:d0928076d9bd8a98de44e79b1abe50c1456e7abbb40af7ef58092086f1a6c729", "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac",
"sha256:d858423f5ed444d494b15c4cc90a206e1b8c31354c781ac7584da0d21c09c1c3", "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51",
"sha256:e6120d63b50e2248219f53302af7ec6fa2a42ed1f37e9cda2c76dbaca65036a7", "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608",
"sha256:f2b1378b63bdb581d5d7af2ec0373c8d40d651941d283a2afd7fc71184b3f570", "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9",
"sha256:facc6f925c3099ac01a1f03758100772560a0b020fb9d70f210404be08006bcb" "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418"
], ],
"version": "==1.14.2" "version": "==1.14.5"
}, },
"pandas": { "pandas": {
"hashes": [ "hashes": [
"sha256:02541a4fdd31315f213a5c8e18708abad719ee03eda05f603c4fe973e9b9d770", "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6",
"sha256:052a66f58783a59ea38fdfee25de083b107baa81fdbe38fabd169d0f9efce2bf", "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd",
"sha256:06efae5c00b9f4c6e6d3fe1eb52e590ff0ea8e5cb58032c724e04d31c540de53", "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca",
"sha256:12f2a19d0b0adf31170d98d0e8bcbc59add0965a9b0c65d39e0665400491c0c5", "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f",
"sha256:244ae0b9e998cfa88452a49b20e29bf582cc7c0e69093876d505aec4f8e1c7fe", "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2",
"sha256:2907f3fe91ca2119ac3c38de6891bbbc83333bfe0d98309768fee28de563ee7a", "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17",
"sha256:44a94091dd71f05922eec661638ec1a35f26d573c119aa2fad964f10a2880e6c", "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690",
"sha256:587a9816cc663c958fcff7907c553b73fe196604f990bc98e1b71ebf07e45b44", "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1",
"sha256:66403162c8b45325a995493bdd78ad4d8be085e527d721dbfa773d56fbba9c88", "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270",
"sha256:68ac484e857dcbbd07ea7c6f516cc67f7f143f5313d9bc661470e7f473528882", "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60",
"sha256:68b121d13177f5128a4c118bb4f73ba40df28292c038389961aa55ea5a996427", "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c",
"sha256:97c8223d42d43d86ca359a57b4702ca0529c6553e83d736e93a5699951f0f8db", "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7",
"sha256:af0dbac881f6f87acd325415adea0ce8cccf28f5d4ad7a54b6a1e176e2f7bf70", "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f",
"sha256:c2cd884794924687edbaad40d18ac984054d247bb877890932c4d41e3c3aba31", "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8",
"sha256:c372db80a5bcb143c9cb254d50f902772c3b093a4f965275197ec2d2184b1e61" "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0"
], ],
"version": "==0.22.0" "version": "==0.23.1"
}, },
"pdbpp": { "pdbpp": {
"hashes": [ "hashes": [
@ -214,10 +229,10 @@
}, },
"python-dateutil": { "python-dateutil": {
"hashes": [ "hashes": [
"sha256:3220490fb9741e2342e1cf29a503394fdac874bc39568288717ee67047ff29df", "sha256:1adb80e7a782c12e52ef9a8182bebeb73f1d7e24e374397af06fb4956c8dc5c0",
"sha256:9d8074be4c993fbe4947878ce593052f71dac82932a677d49194d8ce9778002e" "sha256:e27001de32f627c22380a688bcc43ce83504a7bc5da472209b4c70f02829f0b8"
], ],
"version": "==2.7.2" "version": "==2.7.3"
}, },
"pytz": { "pytz": {
"hashes": [ "hashes": [
@ -235,15 +250,14 @@
}, },
"sortedcontainers": { "sortedcontainers": {
"hashes": [ "hashes": [
"sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761", "sha256:607294c6e291a270948420f7ffa1fb3ed47384a4c08db6d1e9c92d08a6981982",
"sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282" "sha256:ef38b128302ee8f65d81e31c9d8fbf10d81df4d6d06c9c0b66f01d33747525bb"
], ],
"version": "==1.5.10" "version": "==2.0.4"
}, },
"trio": { "trio": {
"hashes": [ "hashes": [
"sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145",
"sha256:8266525067dc6553592383bef1832556e6962daba765fa42a81880c5d9f4b785",
"sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb"
], ],
"version": "==0.4.0" "version": "==0.4.0"
@ -258,9 +272,9 @@
"develop": { "develop": {
"asks": { "asks": {
"hashes": [ "hashes": [
"sha256:d76a6314ecd7d2f920d2e94b8d7bcbb7a0941aa4c915874869c503c757088df2" "sha256:c3fc1115dfeb414ef0863da6f60f02aea7487f92f76b645738774bf93e8577de"
], ],
"version": "==1.3.11" "version": "==2.0.0"
}, },
"async-generator": { "async-generator": {
"hashes": [ "hashes": [
@ -269,12 +283,19 @@
], ],
"version": "==1.9" "version": "==1.9"
}, },
"atomicwrites": {
"hashes": [
"sha256:240831ea22da9ab882b551b31d4225591e5e447a68c5e188db5b89ca1d487585",
"sha256:a24da68318b08ac9c9c45029f4a10371ab5b20e4226738e150e6e7c571630ae6"
],
"version": "==1.1.5"
},
"attrs": { "attrs": {
"hashes": [ "hashes": [
"sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9", "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265",
"sha256:a17a9573a6f475c99b551c0e0a812707ddda1ec9653bed04c13841404ed6f450" "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b"
], ],
"version": "==17.4.0" "version": "==18.1.0"
}, },
"click": { "click": {
"hashes": [ "hashes": [
@ -292,46 +313,47 @@
}, },
"contextvars": { "contextvars": {
"hashes": [ "hashes": [
"sha256:e9f9c5763d5a2afa6e420218e53954bd3829fc2d8acd806b98139d28c362cdf9" "sha256:7d73f8b1426cf0200fbe16900fcd73c9be29c54546b48a3980727e670b1acb10"
], ],
"markers": "python_version < '3.7'", "markers": "python_version < '3.7'",
"version": "==2.1" "version": "==2.2"
}, },
"cython": { "cython": {
"hashes": [ "hashes": [
"sha256:03db8c1b8120039f72493b95494a595be13b01b6860cfc93e2a651a001847b3b", "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5",
"sha256:0d2ccb812d73e67557fd16e7aa7bc5bac18933c1dfe306133cd0680ccab89f33", "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff",
"sha256:24f8ea864de733f5a447896cbeec2cac212247e33272539670b9f466f43f23db", "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d",
"sha256:30a8fd029eb932a7b5a74e158316d1d069ccb67a8607aa7b6c4ed19fab7fbd4a", "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355",
"sha256:37e680901e6a4b97ab67717f9b43fc58542cd10a77431efd2d8801d21d5a37d4", "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526",
"sha256:4984e097bc9da37862d97c1f66dacf2c80fadaea488d96ba0b5ea9d84dbc7521", "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c",
"sha256:4cfda677227af41e4502e088ee9875e71922238a207d0c40785a0fb09c703c21", "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f",
"sha256:4ec60a4086a175a81b9258f810440a6dd2671aa4b419d8248546d85a7de6a93f", "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729",
"sha256:51c7d48ea4cba532d11a6d128ebbc15373013f816e5d1c3a3946650b582a30b8", "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88",
"sha256:634e2f10fc8d026c633cffacb45cd8f4582149fa68e1428124e762dbc566e68a", "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224",
"sha256:67e0359709c8addc3ecb19e1dec6d84d67647e3906da618b953001f6d4480275", "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325",
"sha256:6a93d4ba0461edc7a359241f4ebbaa8f9bc9490b3540a8dd0460bef8c2c706db", "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe",
"sha256:6ba89d56c3ee45716378cda4f0490c3abe1edf79dce8b997f31608b14748a52b", "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15",
"sha256:6ca5436d470584ba6fd399a802c9d0bcf76cf1edb0123725a4de2f0048f9fa07", "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4",
"sha256:7656895cdd59d56dd4ed326d1ee9ede727020d4a5d8778a05af2d8e25af4b13d", "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9",
"sha256:85f7432776870d65639fed00f951a3c05ef1e534bc72a73cd1200d79b9a7d7d0", "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62",
"sha256:96dd674e72281d3feed74fd5adcf0514ba02884f123cdf4fb78567e7be6b1694", "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437",
"sha256:97bf06a89bcf9e8d7633cde89274d42b3b661dc974b58fca066fad762e46b4d8", "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142",
"sha256:9a465e7296a4629139be5d2015577f2ae5e08196eb7dc4c407beea130f362dc3", "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17",
"sha256:9a60355edca1cc9006be086e2633e190542aad2bf9e46948792a48b3ae28ed97", "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214",
"sha256:9eab3696f2cb88167db109d737c787fb9dd34ca414bd1e0c424e307956e02c94", "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea",
"sha256:c3ae7d40ebceb0d944dfeeceaf1fbf17e528f5327d97b008a8623ddddd1ecee3", "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2",
"sha256:c623d19fcc60ea27882f20cf484218926ddf6f978b958dae1070600a1974f809", "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87",
"sha256:c719a6e86d7c737afcc9729994f76b284d1c512099ee803eff11c2a9e6e33a42", "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f",
"sha256:cf17af0433218a1e33dc6f3069dd9e7cd0c80fe505972c3acd548e25f67973fd", "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff",
"sha256:daf96e0d232605e979995795f62ffd24c5c6ecea4526e4cbb86d80f01da954b2", "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304",
"sha256:db40de7d03842d3c4625028a74189ade52b27f8efaeb0d2ca06474f57e0813b2", "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3",
"sha256:deea1ef59445568dd7738fa3913aea7747e4927ff4ae3c10737844b8a5dd3e22", "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64",
"sha256:e05d28b5ce1ee5939d83e50344980659688ecaed65c5e10214d817ecf5d1fe6a", "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed",
"sha256:f5f6694ce668eb7a9b59550bfe4265258809c9b0665c206b26d697df2eef2a8b" "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e",
"sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9"
], ],
"index": "pypi", "index": "pypi",
"version": "==0.28.2" "version": "==0.28.3"
}, },
"e1839a8": { "e1839a8": {
"editable": true, "editable": true,
@ -352,18 +374,34 @@
}, },
"idna": { "idna": {
"hashes": [ "hashes": [
"sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e",
"sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16"
], ],
"version": "==2.6" "version": "==2.7"
},
"immutables": {
"hashes": [
"sha256:1614177d08408176a7b12313d6efefad41c3ba17e56cbeb404bb74f83cfdeaac",
"sha256:18cd84d5ff10ffd42db163d4aad88128d86c2939335b068852f8f399594521ee",
"sha256:4d29f0e6a880a20f94c9efd4672212c3276fa303621a4bdf2a5e4ba7f27cba84",
"sha256:5180319a7aebc9319e63e7ea663830d46566b84bed91cbcdb50e247d65fbda07",
"sha256:57d17742fccec4c0466ab658053fab1a327df42241e7b1cab28e5b85e7066ac1",
"sha256:75e2e4e9441938690821adc1ab00829a174f696aef1c52ae6aba767219d1e84f",
"sha256:812fd1124fca2f56a3a7f376f34ac44719f131d82b6001323aef84faa8be7f00",
"sha256:8a3d2fc0d9db57188f28d3e6cefe45e9905292965895a7a391be6ededa626811",
"sha256:95da22e2d16b47dbd3464ae447b94f8d74866bce045be216dd1691f2f1838a94",
"sha256:9c8e0267a4d35032ccc670e5ab52aba2693ded330c4c1f2a4870b4a7c86b65af",
"sha256:da1c24d6ab6b38604444b4b767b3bf6074dfa3f7d15d8192a3a4935f61f00bd6"
],
"version": "==0.5"
}, },
"more-itertools": { "more-itertools": {
"hashes": [ "hashes": [
"sha256:0dd8f72eeab0d2c3bd489025bb2f6a1b8342f9b198f6fc37b52d15cfa4531fea", "sha256:2b6b9893337bfd9166bee6a62c2b0c9fe7735dcf85948b387ec8cba30e85d8e8",
"sha256:11a625025954c20145b37ff6309cd54e39ca94f72f6bb9576d1195db6fa2442e", "sha256:6703844a52d3588f951883005efcf555e49566a48afd4db4e965d69b883980d3",
"sha256:c9ce7eccdcb901a2c75d326ea134e0886abfbea5f93e91cc95de9507c0816c44" "sha256:a18d870ef2ffca2b8463c0070ad17b5978056f403fb64e3f15fe62a52db21cc0"
], ],
"version": "==4.1.0" "version": "==4.2.0"
}, },
"msgpack": { "msgpack": {
"hashes": [ "hashes": [
@ -388,59 +426,57 @@
}, },
"multio": { "multio": {
"hashes": [ "hashes": [
"sha256:53fd38f5d90a5f1a5d2db507b73c474ef851f5465fab27ffabe401591808258a", "sha256:dcaee4d5d77cde8caf7902c8621aaa192febb384c7b1291fd47cfa41ac0eaebc"
"sha256:a6219395a1f84605c9041f0a7e8a529b989557c8a95920ddcd29fbed1d721758",
"sha256:f61bc6cf0ee8ea0ba32d5b9ae5ae1cadaebc39b6635a9b3d54142ded78164fe3"
], ],
"version": "==0.2.1" "version": "==0.2.3"
}, },
"numpy": { "numpy": {
"hashes": [ "hashes": [
"sha256:0739146eaf4985962f07c62f7133aca89f3a600faac891ce6c7f3a1e2afe5272", "sha256:07379fe0b450f6fd6e5934a9bc015025bb4ce1c8fbed3ca8bef29328b1bc9570",
"sha256:07e21f14490324cc1160db101e9b6c1233c33985af4cb1d301dd02650fea1d7f", "sha256:085afac75bbc97a096744fcfc97a4b321c5a87220286811e85089ae04885acdd",
"sha256:0f6a5ed0cd7ab1da11f5c07a8ecada73fc55a70ef7bb6311a4109891341d7277", "sha256:2d6481c6bdab1c75affc0fc71eb1bd4b3ecef620d06f2f60c3f00521d54be04f",
"sha256:0fd65cbbfdbf76bbf80c445d923b3accefea0fe2c2082049e0ce947c81fe1d3f", "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160",
"sha256:20cac3123d791e4bf8482a580d98d6b5969ba348b9d5364df791ba3a666b660d", "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8",
"sha256:528ce59ded2008f9e8543e0146acb3a98a9890da00adf8904b1e18c82099418b", "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6",
"sha256:56e392b7c738bd70e6f46cf48c8194d3d1dd4c5a59fae4b30c58bb6ef86e5233", "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5",
"sha256:675e0f23967ce71067d12b6944add505d5f0a251f819cfb44bdf8ee7072c090d", "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da",
"sha256:6be6b0ca705321c178c9858e5ad5611af664bbdfae1df1541f938a840a103888", "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388",
"sha256:719d914f564f35cce4dc103808f8297c807c9f0297ac183ed81ae8b5650e698e", "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2",
"sha256:768e777cc1ffdbf97c507f65975c8686ebafe0f3dc8925d02ac117acc4669ce9", "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3",
"sha256:7f76d406c6b998d6410198dcb82688dcdaec7d846aa87e263ccf52efdcfeba30", "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50",
"sha256:8c18ee4dddd5c6a811930c0a7c7947bf16387da3b394725f6063f1366311187d", "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9",
"sha256:99051e03b445117b26028623f1a487112ddf61a09a27e2d25e6bc07d37d94f25", "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b",
"sha256:a1413d06abfa942ca0553bf3bccaff5fdb36d55b84f2248e36228db871147dab", "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b",
"sha256:a7157c9ac6bddd2908c35ef099e4b643bc0e0ebb4d653deb54891d29258dd329", "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e",
"sha256:a958bf9d4834c72dee4f91a0476e7837b8a2966dc6fcfc42c421405f98d0da51", "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e",
"sha256:bb370120de6d26004358611441e07acda26840e41dfedc259d7f8cc613f96495", "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996",
"sha256:d0928076d9bd8a98de44e79b1abe50c1456e7abbb40af7ef58092086f1a6c729", "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac",
"sha256:d858423f5ed444d494b15c4cc90a206e1b8c31354c781ac7584da0d21c09c1c3", "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51",
"sha256:e6120d63b50e2248219f53302af7ec6fa2a42ed1f37e9cda2c76dbaca65036a7", "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608",
"sha256:f2b1378b63bdb581d5d7af2ec0373c8d40d651941d283a2afd7fc71184b3f570", "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9",
"sha256:facc6f925c3099ac01a1f03758100772560a0b020fb9d70f210404be08006bcb" "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418"
], ],
"version": "==1.14.2" "version": "==1.14.5"
}, },
"pandas": { "pandas": {
"hashes": [ "hashes": [
"sha256:02541a4fdd31315f213a5c8e18708abad719ee03eda05f603c4fe973e9b9d770", "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6",
"sha256:052a66f58783a59ea38fdfee25de083b107baa81fdbe38fabd169d0f9efce2bf", "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd",
"sha256:06efae5c00b9f4c6e6d3fe1eb52e590ff0ea8e5cb58032c724e04d31c540de53", "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca",
"sha256:12f2a19d0b0adf31170d98d0e8bcbc59add0965a9b0c65d39e0665400491c0c5", "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f",
"sha256:244ae0b9e998cfa88452a49b20e29bf582cc7c0e69093876d505aec4f8e1c7fe", "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2",
"sha256:2907f3fe91ca2119ac3c38de6891bbbc83333bfe0d98309768fee28de563ee7a", "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17",
"sha256:44a94091dd71f05922eec661638ec1a35f26d573c119aa2fad964f10a2880e6c", "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690",
"sha256:587a9816cc663c958fcff7907c553b73fe196604f990bc98e1b71ebf07e45b44", "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1",
"sha256:66403162c8b45325a995493bdd78ad4d8be085e527d721dbfa773d56fbba9c88", "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270",
"sha256:68ac484e857dcbbd07ea7c6f516cc67f7f143f5313d9bc661470e7f473528882", "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60",
"sha256:68b121d13177f5128a4c118bb4f73ba40df28292c038389961aa55ea5a996427", "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c",
"sha256:97c8223d42d43d86ca359a57b4702ca0529c6553e83d736e93a5699951f0f8db", "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7",
"sha256:af0dbac881f6f87acd325415adea0ce8cccf28f5d4ad7a54b6a1e176e2f7bf70", "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f",
"sha256:c2cd884794924687edbaad40d18ac984054d247bb877890932c4d41e3c3aba31", "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8",
"sha256:c372db80a5bcb143c9cb254d50f902772c3b093a4f965275197ec2d2184b1e61" "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0"
], ],
"version": "==0.22.0" "version": "==0.23.1"
}, },
"pdbpp": { "pdbpp": {
"hashes": [ "hashes": [
@ -459,10 +495,10 @@
}, },
"py": { "py": {
"hashes": [ "hashes": [
"sha256:29c9fab495d7528e80ba1e343b958684f4ace687327e6f789a94bf3d1915f881", "sha256:3fd59af7435864e1a243790d322d763925431213b6b8529c6ca71081ace3bbf7",
"sha256:983f77f3331356039fdd792e9220b7b8ee1aa6bd2b25f567a963ff1de5a64f6a" "sha256:e31fb2767eb657cbde86c454f02e99cb846d3cd9d61b318525140214fdc0e98e"
], ],
"version": "==1.5.3" "version": "==1.5.4"
}, },
"pygments": { "pygments": {
"hashes": [ "hashes": [
@ -473,18 +509,18 @@
}, },
"pytest": { "pytest": {
"hashes": [ "hashes": [
"sha256:6266f87ab64692112e5477eba395cfedda53b1933ccd29478e671e73b420c19c", "sha256:8ea01fc4fcc8e1b1e305252b4bc80a1528019ab99fd3b88666c9dc38d754406c",
"sha256:fae491d1874f199537fd5872b5e1f0e74a009b979df9d53d1553fd03da1703e1" "sha256:90898786b3d0b880b47645bae7b51aa9bbf1e9d1e4510c2cfd15dd65c70ea0cd"
], ],
"index": "pypi", "index": "pypi",
"version": "==3.5.0" "version": "==3.6.2"
}, },
"python-dateutil": { "python-dateutil": {
"hashes": [ "hashes": [
"sha256:3220490fb9741e2342e1cf29a503394fdac874bc39568288717ee67047ff29df", "sha256:1adb80e7a782c12e52ef9a8182bebeb73f1d7e24e374397af06fb4956c8dc5c0",
"sha256:9d8074be4c993fbe4947878ce593052f71dac82932a677d49194d8ce9778002e" "sha256:e27001de32f627c22380a688bcc43ce83504a7bc5da472209b4c70f02829f0b8"
], ],
"version": "==2.7.2" "version": "==2.7.3"
}, },
"pytz": { "pytz": {
"hashes": [ "hashes": [
@ -502,15 +538,14 @@
}, },
"sortedcontainers": { "sortedcontainers": {
"hashes": [ "hashes": [
"sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761", "sha256:607294c6e291a270948420f7ffa1fb3ed47384a4c08db6d1e9c92d08a6981982",
"sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282" "sha256:ef38b128302ee8f65d81e31c9d8fbf10d81df4d6d06c9c0b66f01d33747525bb"
], ],
"version": "==1.5.10" "version": "==2.0.4"
}, },
"trio": { "trio": {
"hashes": [ "hashes": [
"sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145",
"sha256:8266525067dc6553592383bef1832556e6962daba765fa42a81880c5d9f4b785",
"sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb"
], ],
"version": "==0.4.0" "version": "==0.4.0"

View File

@ -8,11 +8,14 @@ import socket
from types import ModuleType from types import ModuleType
from typing import Coroutine, Callable from typing import Coroutine, Callable
import msgpack
import trio import trio
from .. import tractor
from ..log import get_logger from ..log import get_logger
from ..ipc import Channel
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger('broker.core')
@ -69,138 +72,13 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
await trio.sleep(sleep) await trio.sleep(sleep)
class StreamQueue:
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
"""
def __init__(self, stream):
self.stream = stream
self.peer = stream.socket.getpeername()
self._agen = self._iter_packets()
async def _iter_packets(self):
"""Yield packets from the underlying stream.
"""
unpacker = msgpack.Unpacker(raw=False)
while True:
try:
data = await self.stream.receive_some(2**10)
log.trace(f"Data is {data}")
except trio.BrokenStreamError:
log.error(f"Stream connection {self.peer} broke")
return
if data == b'':
log.debug("Stream connection was closed")
return
unpacker.feed(data)
for packet in unpacker:
yield packet
async def put(self, data):
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
async def get(self):
return await self._agen.asend(None)
async def __aiter__(self):
return self._agen
class Client:
"""The most basic client.
Use this to talk to any micro-service daemon or other client(s) over a
TCP socket managed by ``trio``.
"""
def __init__(
self, sockaddr: tuple,
on_reconnect: Coroutine,
auto_reconnect: bool = True,
):
self.sockaddr = sockaddr
self._recon_seq = on_reconnect
self._autorecon = auto_reconnect
self.squeue = None
async def connect(self, sockaddr: tuple = None, **kwargs):
sockaddr = sockaddr or self.sockaddr
stream = await trio.open_tcp_stream(*sockaddr, **kwargs)
self.squeue = StreamQueue(stream)
return stream
async def send(self, item):
await self.squeue.put(item)
async def recv(self):
try:
return await self.squeue.get()
except trio.BrokenStreamError as err:
if self._autorecon:
await self._reconnect()
return await self.recv()
async def aclose(self, *args):
await self.squeue.stream.aclose()
async def __aenter__(self):
await self.connect(self.sockaddr)
return self
async def __aexit__(self, *args):
await self.aclose(*args)
async def _reconnect(self):
"""Handle connection failures by polling until a reconnect can be
established.
"""
down = False
while True:
try:
with trio.move_on_after(3) as cancel_scope:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warn("Stream connection re-established!")
# run any reconnection sequence
await self._recon_seq(self)
break
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warn(
f"Connection to {self.sockaddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)
async def aiter_recv(self):
"""Async iterate items from underlying stream.
"""
while True:
try:
async for item in self.squeue:
yield item
except trio.BrokenStreamError as err:
if not self._autorecon:
raise
if self._autorecon: # attempt reconnect
await self._reconnect()
continue
else:
return
async def stream_quotes( async def stream_quotes(
brokermod: ModuleType, brokermod: ModuleType,
get_quotes: Coroutine, get_quotes: Coroutine,
tickers2qs: {str: StreamQueue}, tickers2chans: {str: Channel},
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None: ) -> None:
"""Stream quotes for a sequence of tickers at the given ``rate`` """Stream quotes for a sequence of tickers at the given ``rate``
per second. per second.
@ -219,11 +97,11 @@ async def stream_quotes(
while True: # use an event here to trigger exit? while True: # use an event here to trigger exit?
prequote_start = time.time() prequote_start = time.time()
if not any(tickers2qs.values()): if not any(tickers2chans.values()):
log.warn(f"No subs left for broker {brokermod.name}, exiting task") log.warn(f"No subs left for broker {brokermod.name}, exiting task")
break break
tickers = list(tickers2qs.keys()) tickers = list(tickers2chans.keys())
with trio.move_on_after(3) as cancel_scope: with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers) quotes = await get_quotes(tickers)
@ -234,7 +112,7 @@ async def stream_quotes(
quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time() postquote_start = time.time()
q_payloads = {} chan_payloads = {}
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
if diff_cached: if diff_cached:
# if cache is enabled then only deliver "new" changes # if cache is enabled then only deliver "new" changes
@ -244,25 +122,31 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
for queue in tickers2qs[symbol]: for chan, cid in tickers2chans.get(symbol, set()):
q_payloads.setdefault(queue, {})[symbol] = quote chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
else: else:
for queue in tickers2qs[symbol]: for chan, cid in tickers2chans[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
# deliver to each subscriber # deliver to each subscriber
if q_payloads: if chan_payloads:
for queue, payload in q_payloads.items(): for chan, payload in chan_payloads.items():
try: try:
await queue.put(payload) await chan.send(payload)
except ( except (
# That's right, anything you can think of... # That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError, trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{queue.peer} went down?") log.warn(f"{chan} went down?")
for qset in tickers2qs.values(): for chanset in tickers2chans.values():
qset.discard(queue) chanset.discard((chan, cid))
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3) proc_time = round(time.time() - postquote_start, 3)
@ -277,13 +161,104 @@ async def stream_quotes(
log.debug(f"Sleeping for {delay}") log.debug(f"Sleeping for {delay}")
await trio.sleep(delay) await trio.sleep(delay)
log.info(f"Terminating stream quoter task for {brokermod.name}")
async def start_quoter(
broker2tickersubs: dict, async def get_cached_client(broker, tickers):
clients: dict, """Get the current actor's cached broker client if available or create a
dtasks: set, # daemon task registry new one.
nursery: "Nusery", """
stream: trio.SocketStream, # check if a cached client is in the local actor's statespace
clients = tractor.current_actor().statespace.setdefault('clients', {})
try:
return clients[broker]
except KeyError:
log.info(f"Creating new client for broker {broker}")
brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = (
brokermod, client, client_cntxmng, get_quotes)
return brokermod, client, client_cntxmng, get_quotes
async def symbol_data(broker, tickers):
"""Retrieve baseline symbol info from broker.
"""
_, client, _, get_quotes = await get_cached_client(broker, tickers)
return await client.symbol_data(tickers)
async def smoke_quote(get_quotes, tickers, broker):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
out any symbols not supported by the broker queried in the call to
``get_quotes()``.
"""
# TODO: trim out with #37
#################################################
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for symbols {tickers}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes)
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
f"Symbol `{symbol}` not found by broker `{broker}`"
)
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
f" `{broker}`")
# XXX: not this mutates the input list (for now)
tickers.remove(symbol)
continue
payload[symbol] = quote
return payload
# end of section to be trimmed out with #37
###########################################
def modify_quote_stream(broker, tickers, chan=None, cid=None):
"""Absolute symbol subscription list for each quote stream.
"""
log.info(f"{chan} changed symbol subscription to {tickers}")
ss = tractor.current_actor().statespace
broker2tickersubs = ss['broker2tickersubs']
tickers2chans = broker2tickersubs.get(broker)
# update map from each symbol to requesting client's chan
for ticker in tickers:
tickers2chans.setdefault(ticker, set()).add((chan, cid))
for ticker in filter(
lambda ticker: ticker not in tickers, tickers2chans.copy()
):
chanset = tickers2chans.get(ticker)
if chanset:
chanset.discard((chan, cid))
if not chanset:
# pop empty sets which will trigger bg quoter task termination
tickers2chans.pop(ticker)
async def start_quote_stream(
broker: str,
tickers: [str],
chan: 'Channel' = None,
cid: str = None,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions. """Handle per-broker quote stream subscriptions.
@ -291,116 +266,76 @@ async def start_quoter(
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process for now. limit to one task per process for now.
""" """
queue = StreamQueue(stream) # wrap in a shabby queue-like api # pull global vars from local actor
log.info(f"Accepted new connection from {queue.peer}") ss = tractor.current_actor().statespace
async with queue.stream: broker2tickersubs = ss['broker2tickersubs']
async for broker, tickers in queue: clients = ss['clients']
log.info( dtasks = ss['dtasks']
f"{queue.peer} subscribed to {broker} for tickers {tickers}") tickers = list(tickers)
log.info(
f"{chan.uid} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs: brokermod, client, _, get_quotes = await get_cached_client(broker, tickers)
brokermod = get_brokermod(broker) if broker not in broker2tickersubs:
tickers2chans = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker]
# TODO: move to AsyncExitStack in 3.7 # do a smoke quote (not this mutates the input list and filters out bad
client_cntxmng = brokermod.get_client() # symbols for now)
client = await client_cntxmng.__aenter__() payload = await smoke_quote(get_quotes, tickers, broker)
get_quotes = await brokermod.quoter(client, tickers) # push initial smoke quote response for client initialization
clients[broker] = ( await chan.send({'yield': payload, 'cid': cid})
brokermod, client, client_cntxmng, get_quotes)
tickers2qs = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker]
# beginning of section to be trimmed out with #37 # update map from each symbol to requesting client's chan
################################################# modify_quote_stream(broker, tickers, chan=chan, cid=cid)
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client try:
# subscription even when a broker quoter task is already running if broker not in dtasks: # no quoter task yet
# since the new client needs to know what symbols are accepted # task should begin on the next checkpoint/iteration
log.warn(f"Retrieving smoke quote for {queue.peer}") # with trio.open_cancel_scope(shield=True):
quotes = await get_quotes(tickers) log.info(f"Spawning quoter task for {brokermod.name}")
# report any tickers that aren't returned in the first quote # await actor._root_nursery.start(partial(
invalid_tickers = set(tickers) - set(quotes) async with trio.open_nursery() as nursery:
for symbol in invalid_tickers: nursery.start_soon(partial(
tickers.remove(symbol) stream_quotes, brokermod, get_quotes, tickers2chans,
log.warn( cid=cid)
f"Symbol `{symbol}` not found by broker `{brokermod.name}`"
) )
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
f" `{brokermod.name}`")
tickers.remove(symbol)
continue
payload[symbol] = quote
# end of section to be trimmed out with #37
###########################################
# first respond with symbol data for all tickers (allows
# clients to receive broker specific setup info)
sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?"
await queue.put(sd)
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# push initial quotes response for client initialization
await queue.put(payload)
if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}")
nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2qs)
dtasks.add(broker) dtasks.add(broker)
log.debug("Waiting on subscription request") # unblocks when no more symbols subscriptions exist and the
else: # quote streamer task terminates (usually because another call
log.info(f"client @ {queue.peer} disconnected") # was made to `modify_quoter` to unsubscribe from streaming
# drop any lingering subscriptions # symbols)
for ticker, qset in tickers2qs.items(): log.info(f"Terminated quoter task for {brokermod.name}")
qset.discard(queue)
# if there are no more subscriptions with this broker # TODO: move to AsyncExitStack in 3.7
# drop from broker subs dict for _, _, cntxmng, _ in clients.values():
if not any(tickers2qs.values()): # FIXME: yes I know there's no error handling..
log.info(f"No more subscriptions for {broker}") await cntxmng.__aexit__(None, None, None)
broker2tickersubs.pop(broker, None) finally:
dtasks.discard(broker) # if there are truly no more subscriptions with this broker
# drop from broker subs dict
# TODO: move to AsyncExitStack in 3.7 if not any(tickers2chans.values()):
for _, _, cntxmng, _ in clients.values(): log.info(f"No more subscriptions for {broker}")
# FIXME: yes I know it's totally wrong... broker2tickersubs.pop(broker, None)
await cntxmng.__aexit__(None, None, None) dtasks.discard(broker)
async def _daemon_main(host) -> None: async def _test_price_stream(broker, symbols, *, chan=None, cid=None):
"""Entry point for the broker daemon which waits for connections """Test function for initial tractor draft.
before spawning micro-services.
""" """
# global space for broker-daemon subscriptions brokermod = get_brokermod(broker)
broker2tickersubs = {} client_cntxmng = brokermod.get_client()
clients = {} client = await client_cntxmng.__aenter__()
dtasks = set() get_quotes = await brokermod.quoter(client, symbols)
log.info(f"Spawning quoter task for {brokermod.name}")
assert chan
tickers2chans = {}.fromkeys(symbols, {(chan, cid), })
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
listeners = await nursery.start( nursery.start_soon(
partial( partial(
trio.serve_tcp, stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid)
partial(
start_quoter, broker2tickersubs, clients,
dtasks, nursery
),
1616, host=host,
)
) )
log.debug(f"Spawned {listeners}")

View File

@ -2,7 +2,6 @@
Console interface to broker client/daemons. Console interface to broker client/daemons.
""" """
from functools import partial from functools import partial
from multiprocessing import Process
import json import json
import os import os
@ -12,8 +11,8 @@ import trio
from . import watchlists as wl from . import watchlists as wl
from .brokers import core, get_brokermod from .brokers import core, get_brokermod
from .brokers.core import _daemon_main, Client
from .log import get_console_log, colorize_json, get_logger from .log import get_console_log, colorize_json, get_logger
from . import tractor
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'robinhood' DEFAULT_BROKER = 'robinhood'
@ -22,18 +21,24 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
def run(main, loglevel='info'):
get_console_log(loglevel)
return trio.run(main)
@click.command() @click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host): def pikerd(loglevel, host):
"""Spawn the piker daemon. """Spawn the piker daemon.
""" """
run(partial(_daemon_main, host), loglevel) get_console_log(loglevel)
tractor.run(
None, # no main task - this is a daemon
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
outlive_main=True, # run daemon forever
rpc_module_paths=['piker.brokers.core'],
name='brokerd',
)
@click.group() @click.group()
@ -52,7 +57,7 @@ def cli():
def api(meth, kwargs, loglevel, broker, keys): def api(meth, kwargs, loglevel, broker, keys):
"""client for testing broker API methods with pretty printing of output. """client for testing broker API methods with pretty printing of output.
""" """
log = get_console_log(loglevel) get_console_log(loglevel)
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
_kwargs = {} _kwargs = {}
@ -63,8 +68,9 @@ def api(meth, kwargs, loglevel, broker, keys):
key, _, value = kwarg.partition('=') key, _, value = kwarg.partition('=')
_kwargs[key] = value _kwargs[key] = value
data = run( data = trio.run(
partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) partial(core.api, brokermod, meth, **_kwargs)
)
if keys: if keys:
# filter to requested keys # filter to requested keys
@ -88,10 +94,12 @@ def api(meth, kwargs, loglevel, broker, keys):
help='Ouput in `pandas.DataFrame` format') help='Ouput in `pandas.DataFrame` format')
@click.argument('tickers', nargs=-1, required=True) @click.argument('tickers', nargs=-1, required=True)
def quote(loglevel, broker, tickers, df_output): def quote(loglevel, broker, tickers, df_output):
"""client for testing broker API methods with pretty printing of output. """Retreive symbol quotes on the console in either json or dataframe
format.
""" """
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) get_console_log(loglevel)
quotes = trio.run(partial(core.quote, brokermod, tickers))
if not quotes: if not quotes:
log.error(f"No quotes could be found for {tickers}?") log.error(f"No quotes could be found for {tickers}?")
return return
@ -132,48 +140,28 @@ def watch(loglevel, broker, rate, name, dhost):
async def launch_client(sleep=0.5, tries=10): async def launch_client(sleep=0.5, tries=10):
async def subscribe(client): async with tractor.open_nursery() as nursery:
# initial subs request for symbols async with tractor.find_actor('brokerd') as portal:
await client.send((brokermod.name, tickers)) if not portal:
# symbol data is returned in first response which we'll log.warn("No broker daemon could be found")
# ignore on reconnect log.warning("Spawning local brokerd..")
await client.recv() portal = await nursery.start_actor(
'brokerd',
main=None, # no main task
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
outlive_main=True, # run daemon forever
rpc_module_paths=['piker.brokers.core'],
loglevel=loglevel,
)
client = Client((dhost, 1616), on_reconnect=subscribe) # run kivy app
for _ in range(tries): # try for 5 seconds await _async_main(name, portal, tickers, brokermod, rate)
try:
await client.connect()
break
except OSError as oserr:
await trio.sleep(sleep)
else:
# will raise indicating child proc should be spawned
await client.connect()
async with trio.open_nursery() as nursery: tractor.run(partial(launch_client, tries=1), name='kivy-watchlist')
nursery.start_soon(
_async_main, name, client, tickers,
brokermod, rate
)
# signal exit of stream handler task
await client.aclose()
try:
trio.run(partial(launch_client, tries=1))
except OSError as oserr:
log.warn("No broker daemon could be found")
log.warn(oserr)
log.warning("Spawning local broker-daemon...")
child = Process(
target=run,
args=(partial(_daemon_main, dhost), loglevel),
daemon=True,
name='pikerd',
)
child.start()
trio.run(partial(launch_client, tries=5))
child.join()
@cli.group() @cli.group()

191
piker/ipc.py 100644
View File

@ -0,0 +1,191 @@
"""
Inter-process comms abstractions
"""
from typing import Coroutine, Tuple
import msgpack
import trio
from .log import get_logger
log = get_logger('ipc')
class StreamQueue:
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
"""
def __init__(self, stream):
self.stream = stream
self._agen = self._iter_packets()
self._laddr = self.stream.socket.getsockname()[:2]
self._raddr = self.stream.socket.getpeername()[:2]
self._send_lock = trio.Lock()
async def _iter_packets(self):
"""Yield packets from the underlying stream.
"""
unpacker = msgpack.Unpacker(raw=False, use_list=False)
while True:
try:
data = await self.stream.receive_some(2**10)
log.trace(f"received {data}")
except trio.BrokenStreamError:
log.error(f"Stream connection {self.raddr} broke")
return
if data == b'':
log.debug(f"Stream connection {self.raddr} was closed")
return
unpacker.feed(data)
for packet in unpacker:
yield packet
@property
def laddr(self):
return self._laddr
@property
def raddr(self):
return self._raddr
async def put(self, data):
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
async def get(self):
return await self._agen.asend(None)
async def __aiter__(self):
return self._agen
def connected(self):
return self.stream.socket.fileno() != -1
class Channel:
"""A channel to actors in other processes.
Use this to talk to any micro-service daemon or other client(s) over a
a transport managed by ``trio``.
"""
def __init__(
self,
destaddr: tuple = None,
on_reconnect: Coroutine = None,
auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active
) -> None:
self._recon_seq = on_reconnect
self._autorecon = auto_reconnect
self.squeue = StreamQueue(stream) if stream else None
if self.squeue and destaddr:
raise ValueError(
f"A stream was provided with local addr {self.laddr}"
)
self._destaddr = destaddr or self.squeue.raddr
# set after handshake - always uid of far end
self.uid = None
def __repr__(self):
if self.squeue:
return repr(
self.squeue.stream.socket._sock).replace(
"socket.socket", "Channel")
return object.__repr__(self)
@property
def laddr(self):
return self.squeue.laddr if self.squeue else (None, None)
@property
def raddr(self):
return self.squeue.raddr if self.squeue else (None, None)
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.squeue = StreamQueue(stream)
return stream
async def send(self, item):
log.trace(f"send `{item}`")
await self.squeue.put(item)
async def recv(self):
try:
return await self.squeue.get()
except trio.BrokenStreamError:
if self._autorecon:
await self._reconnect()
return await self.recv()
async def aclose(self, *args):
log.debug(f"Closing {self}")
await self.squeue.stream.aclose()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *args):
await self.aclose(*args)
async def __aiter__(self):
return self.aiter_recv()
async def _reconnect(self):
"""Handle connection failures by polling until a reconnect can be
established.
"""
down = False
while True:
try:
with trio.move_on_after(3) as cancel_scope:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warn("Stream connection re-established!")
# run any reconnection sequence
on_recon = self._recon_seq
if on_recon:
await on_recon(self)
break
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warn(
f"Connection to {self.raddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)
async def aiter_recv(self):
"""Async iterate items from underlying stream.
"""
while True:
try:
async for item in self.squeue:
yield item
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self.squeue.put(sent)
except trio.BrokenStreamError:
if not self._autorecon:
raise
await self.aclose()
if self._autorecon: # attempt reconnect
await self._reconnect()
continue
else:
return
def connected(self):
return self.squeue.connected() if self.squeue else False

899
piker/tractor.py 100644
View File

@ -0,0 +1,899 @@
"""
tracor: An actor model micro-framework.
"""
from collections import defaultdict
from functools import partial
from typing import Coroutine
import importlib
import inspect
import multiprocessing as mp
import traceback
import uuid
import trio
from async_generator import asynccontextmanager
from .ipc import Channel
from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver")
log = get_logger('tractor')
# set at startup and after forks
_current_actor = None
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
class ActorFailure(Exception):
"General actor failure"
class RemoteActorError(ActorFailure):
"Remote actor exception bundled locally"
@asynccontextmanager
async def maybe_open_nursery(nursery=None):
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
"""
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
yield nursery
async def _invoke(
cid, chan, func, kwargs,
treat_as_gen=False, raise_errs=False,
task_status=trio.TASK_STATUS_IGNORED
):
"""Invoke local func and return results over provided channel.
"""
try:
is_async_partial = False
if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func)
if not inspect.iscoroutinefunction(func) and not is_async_partial:
await chan.send({'return': func(**kwargs), 'cid': cid})
else:
coro = func(**kwargs)
if inspect.isasyncgen(coro):
async for item in coro:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
else:
if treat_as_gen:
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
await coro
else:
await chan.send({'return': await coro, 'cid': cid})
task_status.started()
except Exception:
if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid})
else:
raise
async def result_from_q(q):
"""Process a msg from a remote actor.
"""
first_msg = await q.get()
if 'return' in first_msg:
return 'return', first_msg, q
elif 'yield' in first_msg:
return 'yield', first_msg, q
elif 'error' in first_msg:
raise RemoteActorError(first_msg['error'])
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
async def _do_handshake(actor, chan):
await chan.send(actor.uid)
uid = await chan.recv()
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
class Actor:
"""The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API
around "channels".
"""
is_arbiter = False
def __init__(
self,
name: str,
main: Coroutine = None,
rpc_module_paths: [str] = [],
statespace: dict = {},
uid: str = None,
allow_rpc: bool = True,
outlive_main: bool = False,
):
self.name = name
self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths
self._mods = {}
self.main = main
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.statespace = statespace
self._allow_rpc = allow_rpc
self._outlive_main = outlive_main
# filled in by `_async_main` after fork
self._peers = defaultdict(list)
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = []
self._parent_chan = None
self._accept_host = None
async def wait_for_peer(self, uid):
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
log.debug(f"Waiting for peer {uid} to connect")
event = self._peers.setdefault(uid, trio.Event())
await event.wait()
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_namespaces(self):
# We load namespaces after fork since this actor may
# be spawned on a different machine from the original nursery
# and we need to try and load the local module code (if it
# exists)
for path in self.rpc_module_paths:
self._mods[path] = importlib.import_module(path)
async def _stream_handler(
self,
stream: trio.SocketStream,
):
"""
Entry point for new inbound connections to the channel server.
"""
self._no_more_peers.clear()
chan = Channel(stream=stream)
log.info(f"New connection to us {chan}")
# send/receive initial handshake response
try:
uid = await _do_handshake(self, chan)
except StopAsyncIteration:
log.warn(f"Channel {chan} failed to handshake")
return
# channel tracking
event_or_chans = self._peers.pop(uid, None)
if isinstance(event_or_chans, trio.Event):
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event_or_chans.statistics()}")
# Alert any task waiting on this connection to come up
event_or_chans.set()
event_or_chans.clear() # consumer can wait on channel to close
elif isinstance(event_or_chans, list):
log.warn(
f"already have channel(s) for {uid}:{event_or_chans}?"
)
# append new channel
self._peers[uid].extend(event_or_chans)
log.debug(f"Registered {chan} for {uid}")
self._peers[uid].append(chan)
# Begin channel management - respond to remote requests and
# process received reponses.
try:
await self._process_messages(chan)
finally:
# Drop ref to channel so it can be gc-ed and disconnected
if chan is not self._parent_chan:
log.debug(f"Releasing channel {chan}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"No more peer channels")
def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
waiters = q.statistics().tasks_waiting_get
if not waiters:
log.warn(
f"No tasks are currently waiting for results from call {cid}?")
q.put_nowait(msg)
def get_waitq(self, actorid, cid):
log.debug(f"Registering for callid {cid} queue results from {actorid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd(self, chan, ns, func, kwargs):
"""Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
"""
cid = str(uuid.uuid1())
q = self.get_waitq(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, q
async def _process_messages(self, chan, treat_as_gen=False):
"""Process messages async-RPC style.
Process rpc requests and deliver retrieved responses from channels.
"""
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan}")
async with trio.open_nursery() as nursery:
try:
async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Terminating msg loop for {chan}")
break
log.debug(f"Received msg {msg}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
log.debug(f"Waiting on next msg for {chan}")
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
else:
func = getattr(self._mods[ns], funcname)
# spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
log.debug(f"Spawning task for {func}")
nursery.start_soon(
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
log.debug(f"Waiting on next msg for {chan}")
else: # channel disconnect
log.debug(f"{chan} disconnected")
except trio.ClosedStreamError:
log.error(f"{chan} broke")
log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
# after fork routine which invokes a fresh ``trio.run``
log.info(
f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor
_current_actor = self
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.debug(f"Actor {self.uid} terminated")
async def _async_main(
self,
accept_addr,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
parent_addr=None,
nursery=None
):
"""Start the channel server and main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
result = None
try:
async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
if parent_addr is not None:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
chan = self._parent_chan = Channel(
destaddr=parent_addr,
on_reconnect=self.main
)
await chan.connect()
# initial handshake, report who we are, who they are
await _do_handshake(self, chan)
# handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc:
self.load_namespaces()
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
if self.main:
if self._parent_chan:
log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task
await nursery.start(
_invoke, 'main', self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params
)
else:
# run directly
log.debug(f"Running `{self.main}` directly")
result = await self.main()
# terminate local in-proc once its main completes
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
# blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is
# cancelled or signalled by the parent actor)
except Exception:
if self._parent_chan:
log.exception("Actor errored:")
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'main'})
else:
raise
finally:
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
return result
async def _serve_forever(
self,
*,
# (host, port) to bind for channel server
accept_host=None,
accept_port=0,
task_status=trio.TASK_STATUS_IGNORED
):
"""Main coroutine: connect back to the parent, spawn main task, begin
listening for new messages.
"""
async with trio.open_nursery() as nursery:
self._server_nursery = nursery
# TODO: might want to consider having a separate nursery
# for the stream handler such that the server can be cancelled
# whilst leaving existing channels up
listeners = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}")
self._listeners.extend(listeners)
task_status.started()
def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully
cancelling (for all intents and purposes) this actor.
"""
self._root_nursery.cancel_scope.cancel()
def cancel_server(self):
"""Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
self._server_nursery.cancel_scope.cancel()
@property
def accept_addr(self):
"""Primary address to which the channel server is bound.
"""
try:
return self._listeners[0].socket.getsockname()
except OSError:
return
def get_parent(self):
return Portal(self._parent_chan)
def get_chans(self, actorid):
return self._peers[actorid]
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has
access to the top level nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
"""
_registry = defaultdict(list)
is_arbiter = True
def find_actor(self, name):
return self._registry[name]
def register_actor(self, name, sockaddr):
self._registry[name].append(sockaddr)
def unregister_actor(self, name, sockaddr):
sockaddrs = self._registry.get(name)
if sockaddrs:
try:
sockaddrs.remove(sockaddr)
except ValueError:
pass
class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
Allows for invoking remote routines and receiving results through an
underlying ``tractor.Channel`` as though the remote (async)
function / generator was invoked locally.
Think of this like an native async IPC API.
"""
def __init__(self, channel):
self.channel = channel
async def aclose(self):
log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in!
await self.channel.aclose()
async def run(self, ns, func, **kwargs):
"""Submit a function to be scheduled and run by actor, return its
(stream of) result(s).
"""
# TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
chan = self.channel
# ship a function call request to the remote actor
actor = current_actor()
cid, q = await actor.send_cmd(chan, ns, func, kwargs)
# wait on first response msg
resptype, first_msg, q = await result_from_q(q)
if resptype == 'yield':
async def yield_from_q():
yield first_msg['yield']
try:
async for msg in q:
try:
yield msg['yield']
except KeyError:
raise RemoteActorError(msg['error'])
except GeneratorExit:
log.debug(f"Cancelling async gen call {cid} to {chan.uid}")
return yield_from_q()
elif resptype == 'return':
return first_msg['return']
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
@asynccontextmanager
async def open_portal(channel, nursery=None):
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle rpc message processing.
"""
actor = current_actor()
assert actor
was_connected = False
async with maybe_open_nursery(nursery) as nursery:
if not channel.connected():
await channel.connect()
was_connected = True
if channel.uid is None:
await _do_handshake(actor, channel)
if not actor.get_chans(channel.uid):
# actor is not currently managing this channel
actor._peers[channel.uid].append(channel)
nursery.start_soon(actor._process_messages, channel)
yield Portal(channel)
# cancel background msg loop task
nursery.cancel_scope.cancel()
if was_connected:
actor._peers[channel.uid].remove(channel)
await channel.aclose()
class LocalPortal:
"""A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
"""
def __init__(self, actor):
self.actor = actor
async def run(self, ns, func, **kwargs):
"""Run a requested function locally and return it's result.
"""
obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func)
return func(**kwargs)
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor, supervisor=None):
self.supervisor = supervisor
self._actor = actor
# We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope
self._children = {}
async def __aenter__(self):
return self
async def start_actor(
self,
name: str,
main=None,
bind_addr=('127.0.0.1', 0),
statespace=None,
rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set console logging per subactor
):
actor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths,
statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked
outlive_main=outlive_main,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, parent_addr, loglevel),
daemon=True,
name=name,
)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid)
# channel is up, get queue which delivers result from main routine
main_q = self._actor.get_waitq(actor.uid, 'main')
self._children[(name, proc.pid)] = (actor, proc, main_q)
return Portal(chan)
async def wait(self):
async def wait_for_proc(proc):
# TODO: timeout block here?
if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel)
# please god don't hang
proc.join()
log.debug(f"Joined {proc}")
# unblocks when all waiter tasks have completed
async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_proc, proc)
async def cancel(self, hard_kill=False):
log.debug(f"Cancelling nursery")
for subactor, proc, main_q in self._children.values():
if proc is mp.current_process():
# XXX: does this even make sense?
await subactor.cancel()
else:
if hard_kill:
log.warn(f"Hard killing subactors {self._children}")
proc.terminate()
# send KeyBoardInterrupt (trio abort signal) to underlying
# sub-actors
# os.kill(proc.pid, signal.SIGINT)
else:
# send cancel cmd - likely no response from subactor
actor = self._actor
chans = actor.get_chans(subactor.uid)
if chans:
for chan in chans:
await actor.send_cmd(chan, 'self', 'cancel', {})
else:
log.warn(
f"Channel for {subactor.uid} is already down?")
log.debug(f"Waiting on all subactors to complete")
await self.wait()
log.debug(f"All subactors for {self} have terminated")
async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
"""
async def wait_for_actor(actor, proc, q):
if proc.is_alive():
ret_type, msg, q = await result_from_q(q)
log.info(f"{actor.uid} main task completed with {msg}")
if not actor._outlive_main:
# trigger msg loop to break
chans = self._actor.get_chans(actor.uid)
for chan in chans:
log.info(f"Signalling msg loop exit for {actor.uid}")
await chan.send(None)
if etype is not None:
log.warn(f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else:
log.debug(f"Waiting on subactors to complete")
async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
await self.wait()
log.debug(f"Nursery teardown complete")
def current_actor() -> Actor:
"""Get the process-local actor instance.
"""
return _current_actor
@asynccontextmanager
async def open_nursery(supervisor=None, loglevel='WARNING'):
"""Create and yield a new ``ActorNursery``.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
async with ActorNursery(current_actor(), supervisor) as nursery:
yield nursery
class NoArbiterFound(Exception):
"Couldn't find the arbiter?"
async def start_actor(actor, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main
async function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
# assign process-local actor
global _current_actor
_current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
await actor._async_main(
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
nursery=nursery,
)
# XXX: If spawned locally, the actor is cancelled when this
# context is complete given that there are no more active
# peer channels connected to it.
if not actor._outlive_main:
actor.cancel_server()
# unset module state
_current_actor = None
log.info("Completed async main")
@asynccontextmanager
async def _connect_chan(host, port):
"""Attempt to connect to an arbiter's channel server.
Return the channel on success or None on failure.
"""
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()
@asynccontextmanager
async def get_arbiter(host, port):
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name,
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port)
):
"""Ask the arbiter to find actor(s) by name.
Returns a sequence of unconnected portals for each matching actor
known to the arbiter (client code is expected to connect the portals).
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the first one we find
if sockaddrs:
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield
async def _main(async_fn, args, kwargs, name, arbiter_addr):
"""Async entry point for ``tractor``.
"""
main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port)
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
log.warn(f"No actor could be found @ {host}:{port}")
if arbiter_found: # we were able to connect to an arbiter
log.info(f"Arbiter seems to exist @ {host}:{port}")
# create a local actor and start up its main routine/task
actor = Actor(
name or 'anonymous',
main=main,
**kwargs
)
host, port = (_default_arbiter_host, 0)
else:
# start this local actor as the arbiter
actor = Arbiter(name or 'arbiter', main=main, **kwargs)
await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
# Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbiter should
# stay up until a re-election process has taken place - which is not
# implemented yet FYI).
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)

View File

@ -6,7 +6,7 @@ Launch with ``piker watch <watchlist name>``.
(Currently there's a bunch of questrade specific stuff in here) (Currently there's a bunch of questrade specific stuff in here)
""" """
from itertools import chain from itertools import chain
from types import ModuleType from types import ModuleType, AsyncGeneratorType
import trio import trio
from kivy.uix.boxlayout import BoxLayout from kivy.uix.boxlayout import BoxLayout
@ -319,7 +319,7 @@ async def update_quotes(
nursery: 'Nursery', nursery: 'Nursery',
brokermod: ModuleType, brokermod: ModuleType,
widgets: dict, widgets: dict,
client: 'Client', agen: AsyncGeneratorType,
symbol_data: dict, symbol_data: dict,
first_quotes: dict first_quotes: dict
): ):
@ -359,7 +359,7 @@ async def update_quotes(
grid.render_rows(cache) grid.render_rows(cache)
# core cell update loop # core cell update loop
async for quotes in client.aiter_recv(): # new quotes data only async for quotes in agen: # new quotes data only
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
record, displayable = brokermod.format_quote( record, displayable = brokermod.format_quote(
quote, symbol_data=symbol_data) quote, symbol_data=symbol_data)
@ -375,86 +375,96 @@ async def update_quotes(
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
async def run_kivy(root, nursery): async def _async_main(name, portal, tickers, brokermod, rate):
'''Trio-kivy entry point.
'''
await async_runTouchApp(root) # run kivy
nursery.cancel_scope.cancel() # cancel all other tasks that may be running
async def _async_main(name, client, tickers, brokermod, rate):
'''Launch kivy app + all other related tasks. '''Launch kivy app + all other related tasks.
This is started with cli command `piker watch`. This is started with cli command `piker watch`.
''' '''
# subscribe for tickers # subscribe for tickers (this performs a possible filtering
await client.send((brokermod.name, tickers)) # where invalid symbols are discarded)
# get initial symbol data (long term data including last days close price) sd = await portal.run(
# TODO: need something better this this toy protocol "piker.brokers.core", 'symbol_data',
sd = await client.recv() broker=brokermod.name, tickers=tickers)
async with trio.open_nursery() as nursery: # an async generator instance
# get first quotes response agen = await portal.run(
log.debug("Waiting on first quote...") "piker.brokers.core", 'start_quote_stream',
quotes = await client.recv() broker=brokermod.name, tickers=tickers)
first_quotes = [
brokermod.format_quote(quote, symbol_data=sd)[0]
for quote in quotes.values()]
if first_quotes[0].get('last') is None: try:
log.error("Broker API is down temporarily") async with trio.open_nursery() as nursery:
nursery.cancel_scope.cancel() # get first quotes response
return log.debug("Waiting on first quote...")
quotes = await agen.__anext__()
first_quotes = [
brokermod.format_quote(quote, symbol_data=sd)[0]
for quote in quotes.values()]
# build out UI if first_quotes[0].get('last') is None:
Window.set_title(f"watchlist: {name}\t(press ? for help)") log.error("Broker API is down temporarily")
Builder.load_string(_kv) nursery.cancel_scope.cancel()
box = BoxLayout(orientation='vertical', padding=5, spacing=5) return
# define bid-ask "stacked" cells # build out UI
# (TODO: needs some rethinking and renaming for sure) Window.set_title(f"watchlist: {name}\t(press ? for help)")
bidasks = brokermod._bidasks Builder.load_string(_kv)
box = BoxLayout(orientation='vertical', padding=5, spacing=5)
# add header row # define bid-ask "stacked" cells
headers = first_quotes[0].keys() # (TODO: needs some rethinking and renaming for sure)
header = Row( bidasks = brokermod._bidasks
{key: key for key in headers},
headers=headers,
bidasks=bidasks,
is_header_row=True,
size_hint=(1, None),
)
box.add_widget(header)
# build grid # add header row
grid = TickerTable( headers = first_quotes[0].keys()
cols=1, header = Row(
size_hint=(1, None), {key: key for key in headers},
) headers=headers,
for ticker_record in first_quotes: bidasks=bidasks,
grid.append_row(ticker_record, bidasks=bidasks) is_header_row=True,
# associate the col headers row with the ticker table even though size_hint=(1, None),
# they're technically wrapped separately in containing BoxLayout )
header.table = grid box.add_widget(header)
# mark the initial sorted column header as bold and underlined # build grid
sort_cell = header.get_cell(grid.sort_key) grid = TickerTable(
sort_cell.bold = sort_cell.underline = True cols=1,
grid.last_clicked_col_cell = sort_cell size_hint=(1, None),
)
for ticker_record in first_quotes:
grid.append_row(ticker_record, bidasks=bidasks)
# associate the col headers row with the ticker table even though
# they're technically wrapped separately in containing BoxLayout
header.table = grid
# set up a pager view for large ticker lists # mark the initial sorted column header as bold and underlined
grid.bind(minimum_height=grid.setter('height')) sort_cell = header.get_cell(grid.sort_key)
pager = PagerView(box, grid, nursery) sort_cell.bold = sort_cell.underline = True
box.add_widget(pager) grid.last_clicked_col_cell = sort_cell
widgets = { # set up a pager view for large ticker lists
# 'anchor': anchor, grid.bind(minimum_height=grid.setter('height'))
'root': box, pager = PagerView(box, grid, nursery)
'grid': grid, box.add_widget(pager)
'box': box,
'header': header, widgets = {
'pager': pager, # 'anchor': anchor,
} 'root': box,
nursery.start_soon(run_kivy, widgets['root'], nursery) 'grid': grid,
nursery.start_soon( 'box': box,
update_quotes, nursery, brokermod, widgets, client, sd, quotes) 'header': header,
'pager': pager,
}
nursery.start_soon(
update_quotes, nursery, brokermod, widgets, agen, sd, quotes)
# Trio-kivy entry point.
await async_runTouchApp(widgets['root']) # run kivy
await agen.aclose() # cancel aysnc gen call
finally:
# un-subscribe from symbols stream
await portal.run(
"piker.brokers.core", 'modify_quote_stream',
broker=brokermod.name, tickers=[])
# cancel GUI update task
nursery.cancel_scope.cancel()

View File

@ -0,0 +1,143 @@
"""
Actor model API testing
"""
import time
from functools import partial
import pytest
import trio
from piker import tractor
@pytest.fixture
def us_symbols():
return ['TSLA', 'AAPL', 'CGC', 'CRON']
@pytest.mark.trio
async def test_no_arbitter():
"""An arbitter must be established before any nurseries
can be created.
(In other words ``tractor.run`` must be used instead of ``trio.run`` as is
done by the ``pytest-trio`` plugin.)
"""
with pytest.raises(RuntimeError):
with tractor.open_nursery():
pass
def test_local_actor_async_func():
"""Verify a simple async function in-process.
"""
nums = []
async def print_loop():
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
for i in range(10):
nums.append(i)
await trio.sleep(0.1)
start = time.time()
tractor.run(print_loop)
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
assert nums == list(range(10))
# NOTE: this func must be defined at module level in order for the
# interal pickling infra of the forkserver to work
async def spawn(is_arbiter):
statespace = {'doggy': 10, 'kitty': 4}
namespaces = ['piker.brokers.core']
await trio.sleep(0.1)
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
# arbiter should always have an empty statespace as it's redundant
assert actor.statespace == statespace
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# forks here
portal = await nursery.start_actor(
'sub-actor',
main=partial(spawn, False),
statespace=statespace,
rpc_module_paths=namespaces,
)
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
else:
return 10
def test_local_arbiter_subactor_global_state():
statespace = {'doggy': 10, 'kitty': 4}
tractor.run(
spawn,
True,
name='arbiter',
statespace=statespace,
)
async def rx_price_quotes_from_brokerd(us_symbols):
"""Verify we can spawn a daemon actor and retrieve streamed price data.
"""
async with tractor.find_actor('brokerd') as portals:
if not portals:
# only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery:
# no brokerd actor found
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=['piker.brokers.core'],
statespace={
'brokers2tickersubs': {},
'clients': {},
'dtasks': set()
},
main=None, # don't start a main func - use rpc
)
# gotta expose in a broker agnostic way...
# retrieve initial symbol data
# sd = await portal.run(
# 'piker.brokers.core', 'symbol_data', symbols=us_symbols)
# assert list(sd.keys()) == us_symbols
gen = await portal.run(
'piker.brokers.core',
'_test_price_stream',
broker='robinhood',
symbols=us_symbols,
)
# it'd sure be nice to have an asyncitertools here...
async for quotes in gen:
assert quotes
for key in quotes:
assert key in us_symbols
break
# terminate far-end async-gen
# await gen.asend(None)
# break
# stop all spawned subactors
await nursery.cancel()
# arbitter is cancelled here due to `find_actors()` internals
# (which internally uses `get_arbiter` which kills its channel
# server scope on exit)
def test_rx_price_quotes_from_brokerd(us_symbols):
tractor.run(
rx_price_quotes_from_brokerd,
us_symbols,
name='arbiter',
)