commit
6dbd919914
4
Pipfile
4
Pipfile
|
@ -8,13 +8,15 @@ name = "pypi"
|
|||
Cython = "*"
|
||||
# matham's next-gen async port of kivy
|
||||
Kivy = {git = "git://github.com/matham/kivy.git", ref = "async-loop"}
|
||||
tractor = {git = "git://github.com/tgoodlet/tractor.git", ref = "master"}
|
||||
pdbpp = "*"
|
||||
msgpack = "*"
|
||||
trio = "*"
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
pdbpp = "*"
|
||||
"e1839a8" = {path = ".", editable = true}
|
||||
piker = {editable = true, path = "."}
|
||||
|
||||
[requires]
|
||||
python_version = "3.6"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "8d6521c10626550c727da281b89d0b6df5d31eb7dc5fe6e7aadf3de58bae44a7"
|
||||
"sha256": "3ecdccfdf0abbb69730204c608319fcd7b151115a0e183dae6cb2c0748beeb58"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
|
@ -59,40 +59,37 @@
|
|||
},
|
||||
"cython": {
|
||||
"hashes": [
|
||||
"sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5",
|
||||
"sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff",
|
||||
"sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d",
|
||||
"sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355",
|
||||
"sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526",
|
||||
"sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c",
|
||||
"sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f",
|
||||
"sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729",
|
||||
"sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88",
|
||||
"sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224",
|
||||
"sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325",
|
||||
"sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe",
|
||||
"sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15",
|
||||
"sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4",
|
||||
"sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9",
|
||||
"sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62",
|
||||
"sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437",
|
||||
"sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142",
|
||||
"sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17",
|
||||
"sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214",
|
||||
"sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea",
|
||||
"sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2",
|
||||
"sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87",
|
||||
"sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f",
|
||||
"sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff",
|
||||
"sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304",
|
||||
"sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3",
|
||||
"sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64",
|
||||
"sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed",
|
||||
"sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e",
|
||||
"sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9"
|
||||
"sha256:01487236575df8f17b46982071438dce4f7eaf8acc8fb99fca3510d343cd7a28",
|
||||
"sha256:0671d17c7a27634d6819246e535241b951141ed0e3f6f2a6d618fd32344dae3e",
|
||||
"sha256:0e6190d6971c46729f712dd7307a9c0a8c027bfa5b4d8f2edef106b01759926c",
|
||||
"sha256:202587c754901d0678bd6ff89c707f099987928239049a528470c06c6c922cf8",
|
||||
"sha256:345197ba9278cf6a914cb7421dc665a0531a219b0072abf6b0cebfdf68e75725",
|
||||
"sha256:3a296b8d6b02f0e01ab04bedea658f43eef5ad2f8e586a820226ead1a677d9b1",
|
||||
"sha256:484572a2b22823a967be106137a93f7d634db116b3f7accb37dbd760eda2fa9f",
|
||||
"sha256:4c67c9c803e50ceff32cc5e4769c50fc8ae8df9c4e5cc592ce8310b5a1076d23",
|
||||
"sha256:539038087c321911745fc2e77049209b1231300d481cb4d682b2f95c724814b3",
|
||||
"sha256:58113e0683c3688594c112103d7e9f2d0092fd2d8297a220240bea22e184dfdd",
|
||||
"sha256:65cb25ca4284804293a2404d1be3b5a98818be21a72791649bacbcfa4e431d41",
|
||||
"sha256:699e765da2580e34b08473fc0acef3a2d7bcb7f13eb29401cd25236bcf000080",
|
||||
"sha256:6b54c3470810cea49a8be90814d05c5325ceb9c5bf429fd86c36fc1b32dfc157",
|
||||
"sha256:71ac1629e4eae2ed329be8caf45efea10bfe1af3d8767e12e64b83e4ea5a3250",
|
||||
"sha256:722c179d3df8677f3daf45b1a2764678ed4f0aaddbaa7211a8a08ebfd907c0db",
|
||||
"sha256:76ac2b08d3d956d77b574bb43cbf1d37bd58b9d50c04ba281303e695854ebc46",
|
||||
"sha256:7eff1157be9e26bf7494288c89979ca69d593a009e2c7420a739e2cf1e0635f5",
|
||||
"sha256:99546c8696d27d0efa639c77b2f8af6e61dc3a5073caae4f27ffd991ca926f42",
|
||||
"sha256:a0c263b31d335f29c11f4a9e98fbcd908d0731d4ea99bfd27c1c47caaeb4ca2e",
|
||||
"sha256:a29c66292605bff962adc26530c030607aa699206b12dfb84f131b0454e15df4",
|
||||
"sha256:a4d3724c5a1ddd86d7d830d8e02c40151839b833791dd4b6fe9e144380fa7d37",
|
||||
"sha256:aed9f33b19d542eea56c38ef3862ca56147f7903648156cd57eabb0fe47c35d6",
|
||||
"sha256:b57e733dd8871d2cc7358c2e0fe33027453afffbcd0ea6a537f54877cad5131c",
|
||||
"sha256:d5bf4db62236e82955c40bafbaa18d54b20b5ceefa06fb57c7facc443929f4bd",
|
||||
"sha256:d9272dd71ab78e87fa34a0a59bbd6acc9a9c0005c834a6fc8457ff9619dc6795",
|
||||
"sha256:e9d5671bcbb90a41b0832fcb3872fcbaca3d68ff11ea09724dd6cbdf31d947fb",
|
||||
"sha256:ee54646afb2b73b293c94cf079682d18d404ebd6c01122dc3980f111aec2d8ae",
|
||||
"sha256:f16a87197939977824609005b73f9ebb291b9653a14e5f27afc1c5d6f981ba39"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.28.3"
|
||||
"version": "==0.28.4"
|
||||
},
|
||||
"e1839a8": {
|
||||
"editable": true,
|
||||
|
@ -173,45 +170,54 @@
|
|||
"sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160",
|
||||
"sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8",
|
||||
"sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6",
|
||||
"sha256:4130e5ae16c656b7de654dc5e595cfeb85d3a4b0bb0734d19c0dce6dc7ee0e07",
|
||||
"sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5",
|
||||
"sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da",
|
||||
"sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388",
|
||||
"sha256:5ae3564cb630e155a650f4f9c054589848e97836bebae5637240a0d8099f817b",
|
||||
"sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2",
|
||||
"sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3",
|
||||
"sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50",
|
||||
"sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9",
|
||||
"sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b",
|
||||
"sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b",
|
||||
"sha256:91fdd510743ae4df862dbd51a4354519dd9fb8941347526cd9c2194b792b3da9",
|
||||
"sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e",
|
||||
"sha256:9b705f18b26fb551366ab6347ba9941b62272bf71c6bbcadcd8af94d10535241",
|
||||
"sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e",
|
||||
"sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996",
|
||||
"sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac",
|
||||
"sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51",
|
||||
"sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608",
|
||||
"sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9",
|
||||
"sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418"
|
||||
"sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418",
|
||||
"sha256:e1d18421a7e2ad4a655b76e65d549d4159f8874c18a417464c1d439ee7ccc7cd"
|
||||
],
|
||||
"version": "==1.14.5"
|
||||
},
|
||||
"pandas": {
|
||||
"hashes": [
|
||||
"sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6",
|
||||
"sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd",
|
||||
"sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca",
|
||||
"sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f",
|
||||
"sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2",
|
||||
"sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17",
|
||||
"sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690",
|
||||
"sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1",
|
||||
"sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270",
|
||||
"sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60",
|
||||
"sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c",
|
||||
"sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7",
|
||||
"sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f",
|
||||
"sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8",
|
||||
"sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0"
|
||||
"sha256:05ac350f8a35abe6a02054f8cf54e0c048f13423b2acb87d018845afd736f0b4",
|
||||
"sha256:174543cd68eaee60620146b38faaed950071f5665e0a4fa4adfdcfc23d7f7936",
|
||||
"sha256:1a62a237fb7223c11d09daaeaf7d15f234bb836bfaf3d4f85746cdf9b2582f99",
|
||||
"sha256:2c1ed1de5308918a7c6833df6db75a19c416c122921824e306c64a0626b3606c",
|
||||
"sha256:33825ad26ce411d6526f903b3d02c0edf627223af59cf4b5876aa925578eec74",
|
||||
"sha256:4c5f76fce8a4851f65374ea1d95ca24e9439540550e41e556c0879379517a6f5",
|
||||
"sha256:67504a96f72fb4d7f051cfe77b9a7bb0d094c4e2e5a6efb2769eb80f36e6b309",
|
||||
"sha256:683e0cc8c7faececbbc06aa4735709a07abad106099f165730c1015da916adec",
|
||||
"sha256:77cd1b485c6a860b950ab3a85be7b5683eaacbc51cadf096db967886607d2231",
|
||||
"sha256:814f8785f1ab412a7e9b9a8abb81dfe8727ebdeef850ecfaa262c04b1664000f",
|
||||
"sha256:894216edaf7dd0a92623cdad423bbec2a23fc06eb9c85483e21876d1ef8f47e9",
|
||||
"sha256:9331e20a07360b81d8c7b4b50223da387d264151d533a5a5853325800e6631a4",
|
||||
"sha256:9cd3614b4e31a0889388ff1bd19ae857ad52658b33f776065793c293a29cf612",
|
||||
"sha256:9d79e958adcd037eba3debbb66222804171197c0f5cd462315d1356aa72a5a30",
|
||||
"sha256:b90e5d5460f23607310cbd1688a7517c96ce7b284095a48340d249dfc429172e",
|
||||
"sha256:bc80c13ffddc7e269b706ed58002cc4c98cc135c36d827c99fb5ca54ced0eb7a",
|
||||
"sha256:cbb074efb2a5e4956b261a670bfc2126b0ccfbf5b96b6ed021bc8c8cb56cf4a8",
|
||||
"sha256:e8c62ab16feeda84d4732c42b7b67d7a89ad89df7e99efed80ea017bdc472f26",
|
||||
"sha256:ff5ef271805fe877fe0d1337b6b1861113c44c75b9badb595c713a72cd337371"
|
||||
],
|
||||
"version": "==0.23.1"
|
||||
"version": "==0.23.3"
|
||||
},
|
||||
"pdbpp": {
|
||||
"hashes": [
|
||||
|
@ -236,10 +242,10 @@
|
|||
},
|
||||
"pytz": {
|
||||
"hashes": [
|
||||
"sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555",
|
||||
"sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749"
|
||||
"sha256:a061aa0a9e06881eb8b3b2b43f05b9439d6583c206d0a6c340ff72a7b6669053",
|
||||
"sha256:ffb9ef1de172603304d9d2819af6f5ece76f2e85ec10692a524dd876e72bf277"
|
||||
],
|
||||
"version": "==2018.4"
|
||||
"version": "==2018.5"
|
||||
},
|
||||
"six": {
|
||||
"hashes": [
|
||||
|
@ -255,11 +261,16 @@
|
|||
],
|
||||
"version": "==2.0.4"
|
||||
},
|
||||
"tractor": {
|
||||
"git": "git://github.com/tgoodlet/tractor.git",
|
||||
"ref": "f726bd81da3a9f9f0754ffa26c92edb3c3a68608"
|
||||
},
|
||||
"trio": {
|
||||
"hashes": [
|
||||
"sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145",
|
||||
"sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.4.0"
|
||||
},
|
||||
"wmctrl": {
|
||||
|
@ -320,44 +331,37 @@
|
|||
},
|
||||
"cython": {
|
||||
"hashes": [
|
||||
"sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5",
|
||||
"sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff",
|
||||
"sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d",
|
||||
"sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355",
|
||||
"sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526",
|
||||
"sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c",
|
||||
"sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f",
|
||||
"sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729",
|
||||
"sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88",
|
||||
"sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224",
|
||||
"sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325",
|
||||
"sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe",
|
||||
"sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15",
|
||||
"sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4",
|
||||
"sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9",
|
||||
"sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62",
|
||||
"sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437",
|
||||
"sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142",
|
||||
"sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17",
|
||||
"sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214",
|
||||
"sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea",
|
||||
"sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2",
|
||||
"sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87",
|
||||
"sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f",
|
||||
"sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff",
|
||||
"sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304",
|
||||
"sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3",
|
||||
"sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64",
|
||||
"sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed",
|
||||
"sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e",
|
||||
"sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9"
|
||||
"sha256:01487236575df8f17b46982071438dce4f7eaf8acc8fb99fca3510d343cd7a28",
|
||||
"sha256:0671d17c7a27634d6819246e535241b951141ed0e3f6f2a6d618fd32344dae3e",
|
||||
"sha256:0e6190d6971c46729f712dd7307a9c0a8c027bfa5b4d8f2edef106b01759926c",
|
||||
"sha256:202587c754901d0678bd6ff89c707f099987928239049a528470c06c6c922cf8",
|
||||
"sha256:345197ba9278cf6a914cb7421dc665a0531a219b0072abf6b0cebfdf68e75725",
|
||||
"sha256:3a296b8d6b02f0e01ab04bedea658f43eef5ad2f8e586a820226ead1a677d9b1",
|
||||
"sha256:484572a2b22823a967be106137a93f7d634db116b3f7accb37dbd760eda2fa9f",
|
||||
"sha256:4c67c9c803e50ceff32cc5e4769c50fc8ae8df9c4e5cc592ce8310b5a1076d23",
|
||||
"sha256:539038087c321911745fc2e77049209b1231300d481cb4d682b2f95c724814b3",
|
||||
"sha256:58113e0683c3688594c112103d7e9f2d0092fd2d8297a220240bea22e184dfdd",
|
||||
"sha256:65cb25ca4284804293a2404d1be3b5a98818be21a72791649bacbcfa4e431d41",
|
||||
"sha256:699e765da2580e34b08473fc0acef3a2d7bcb7f13eb29401cd25236bcf000080",
|
||||
"sha256:6b54c3470810cea49a8be90814d05c5325ceb9c5bf429fd86c36fc1b32dfc157",
|
||||
"sha256:71ac1629e4eae2ed329be8caf45efea10bfe1af3d8767e12e64b83e4ea5a3250",
|
||||
"sha256:722c179d3df8677f3daf45b1a2764678ed4f0aaddbaa7211a8a08ebfd907c0db",
|
||||
"sha256:76ac2b08d3d956d77b574bb43cbf1d37bd58b9d50c04ba281303e695854ebc46",
|
||||
"sha256:7eff1157be9e26bf7494288c89979ca69d593a009e2c7420a739e2cf1e0635f5",
|
||||
"sha256:99546c8696d27d0efa639c77b2f8af6e61dc3a5073caae4f27ffd991ca926f42",
|
||||
"sha256:a0c263b31d335f29c11f4a9e98fbcd908d0731d4ea99bfd27c1c47caaeb4ca2e",
|
||||
"sha256:a29c66292605bff962adc26530c030607aa699206b12dfb84f131b0454e15df4",
|
||||
"sha256:a4d3724c5a1ddd86d7d830d8e02c40151839b833791dd4b6fe9e144380fa7d37",
|
||||
"sha256:aed9f33b19d542eea56c38ef3862ca56147f7903648156cd57eabb0fe47c35d6",
|
||||
"sha256:b57e733dd8871d2cc7358c2e0fe33027453afffbcd0ea6a537f54877cad5131c",
|
||||
"sha256:d5bf4db62236e82955c40bafbaa18d54b20b5ceefa06fb57c7facc443929f4bd",
|
||||
"sha256:d9272dd71ab78e87fa34a0a59bbd6acc9a9c0005c834a6fc8457ff9619dc6795",
|
||||
"sha256:e9d5671bcbb90a41b0832fcb3872fcbaca3d68ff11ea09724dd6cbdf31d947fb",
|
||||
"sha256:ee54646afb2b73b293c94cf079682d18d404ebd6c01122dc3980f111aec2d8ae",
|
||||
"sha256:f16a87197939977824609005b73f9ebb291b9653a14e5f27afc1c5d6f981ba39"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.28.3"
|
||||
},
|
||||
"e1839a8": {
|
||||
"editable": true,
|
||||
"path": "."
|
||||
"version": "==0.28.4"
|
||||
},
|
||||
"fancycompleter": {
|
||||
"hashes": [
|
||||
|
@ -438,45 +442,54 @@
|
|||
"sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160",
|
||||
"sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8",
|
||||
"sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6",
|
||||
"sha256:4130e5ae16c656b7de654dc5e595cfeb85d3a4b0bb0734d19c0dce6dc7ee0e07",
|
||||
"sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5",
|
||||
"sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da",
|
||||
"sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388",
|
||||
"sha256:5ae3564cb630e155a650f4f9c054589848e97836bebae5637240a0d8099f817b",
|
||||
"sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2",
|
||||
"sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3",
|
||||
"sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50",
|
||||
"sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9",
|
||||
"sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b",
|
||||
"sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b",
|
||||
"sha256:91fdd510743ae4df862dbd51a4354519dd9fb8941347526cd9c2194b792b3da9",
|
||||
"sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e",
|
||||
"sha256:9b705f18b26fb551366ab6347ba9941b62272bf71c6bbcadcd8af94d10535241",
|
||||
"sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e",
|
||||
"sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996",
|
||||
"sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac",
|
||||
"sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51",
|
||||
"sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608",
|
||||
"sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9",
|
||||
"sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418"
|
||||
"sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418",
|
||||
"sha256:e1d18421a7e2ad4a655b76e65d549d4159f8874c18a417464c1d439ee7ccc7cd"
|
||||
],
|
||||
"version": "==1.14.5"
|
||||
},
|
||||
"pandas": {
|
||||
"hashes": [
|
||||
"sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6",
|
||||
"sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd",
|
||||
"sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca",
|
||||
"sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f",
|
||||
"sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2",
|
||||
"sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17",
|
||||
"sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690",
|
||||
"sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1",
|
||||
"sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270",
|
||||
"sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60",
|
||||
"sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c",
|
||||
"sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7",
|
||||
"sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f",
|
||||
"sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8",
|
||||
"sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0"
|
||||
"sha256:05ac350f8a35abe6a02054f8cf54e0c048f13423b2acb87d018845afd736f0b4",
|
||||
"sha256:174543cd68eaee60620146b38faaed950071f5665e0a4fa4adfdcfc23d7f7936",
|
||||
"sha256:1a62a237fb7223c11d09daaeaf7d15f234bb836bfaf3d4f85746cdf9b2582f99",
|
||||
"sha256:2c1ed1de5308918a7c6833df6db75a19c416c122921824e306c64a0626b3606c",
|
||||
"sha256:33825ad26ce411d6526f903b3d02c0edf627223af59cf4b5876aa925578eec74",
|
||||
"sha256:4c5f76fce8a4851f65374ea1d95ca24e9439540550e41e556c0879379517a6f5",
|
||||
"sha256:67504a96f72fb4d7f051cfe77b9a7bb0d094c4e2e5a6efb2769eb80f36e6b309",
|
||||
"sha256:683e0cc8c7faececbbc06aa4735709a07abad106099f165730c1015da916adec",
|
||||
"sha256:77cd1b485c6a860b950ab3a85be7b5683eaacbc51cadf096db967886607d2231",
|
||||
"sha256:814f8785f1ab412a7e9b9a8abb81dfe8727ebdeef850ecfaa262c04b1664000f",
|
||||
"sha256:894216edaf7dd0a92623cdad423bbec2a23fc06eb9c85483e21876d1ef8f47e9",
|
||||
"sha256:9331e20a07360b81d8c7b4b50223da387d264151d533a5a5853325800e6631a4",
|
||||
"sha256:9cd3614b4e31a0889388ff1bd19ae857ad52658b33f776065793c293a29cf612",
|
||||
"sha256:9d79e958adcd037eba3debbb66222804171197c0f5cd462315d1356aa72a5a30",
|
||||
"sha256:b90e5d5460f23607310cbd1688a7517c96ce7b284095a48340d249dfc429172e",
|
||||
"sha256:bc80c13ffddc7e269b706ed58002cc4c98cc135c36d827c99fb5ca54ced0eb7a",
|
||||
"sha256:cbb074efb2a5e4956b261a670bfc2126b0ccfbf5b96b6ed021bc8c8cb56cf4a8",
|
||||
"sha256:e8c62ab16feeda84d4732c42b7b67d7a89ad89df7e99efed80ea017bdc472f26",
|
||||
"sha256:ff5ef271805fe877fe0d1337b6b1861113c44c75b9badb595c713a72cd337371"
|
||||
],
|
||||
"version": "==0.23.1"
|
||||
"version": "==0.23.3"
|
||||
},
|
||||
"pdbpp": {
|
||||
"hashes": [
|
||||
|
@ -509,11 +522,11 @@
|
|||
},
|
||||
"pytest": {
|
||||
"hashes": [
|
||||
"sha256:8ea01fc4fcc8e1b1e305252b4bc80a1528019ab99fd3b88666c9dc38d754406c",
|
||||
"sha256:90898786b3d0b880b47645bae7b51aa9bbf1e9d1e4510c2cfd15dd65c70ea0cd"
|
||||
"sha256:0453c8676c2bee6feb0434748b068d5510273a916295fd61d306c4f22fbfd752",
|
||||
"sha256:4b208614ae6d98195430ad6bde03641c78553acee7c83cec2e85d613c0cd383d"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==3.6.2"
|
||||
"version": "==3.6.3"
|
||||
},
|
||||
"python-dateutil": {
|
||||
"hashes": [
|
||||
|
@ -524,10 +537,10 @@
|
|||
},
|
||||
"pytz": {
|
||||
"hashes": [
|
||||
"sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555",
|
||||
"sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749"
|
||||
"sha256:a061aa0a9e06881eb8b3b2b43f05b9439d6583c206d0a6c340ff72a7b6669053",
|
||||
"sha256:ffb9ef1de172603304d9d2819af6f5ece76f2e85ec10692a524dd876e72bf277"
|
||||
],
|
||||
"version": "==2018.4"
|
||||
"version": "==2018.5"
|
||||
},
|
||||
"six": {
|
||||
"hashes": [
|
||||
|
@ -548,6 +561,7 @@
|
|||
"sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145",
|
||||
"sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.4.0"
|
||||
},
|
||||
"wmctrl": {
|
||||
|
|
|
@ -9,10 +9,9 @@ from types import ModuleType
|
|||
from typing import Coroutine, Callable
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
from .. import tractor
|
||||
from ..log import get_logger
|
||||
from ..ipc import Channel
|
||||
from ..log import get_logger, get_console_log
|
||||
from . import get_brokermod
|
||||
|
||||
|
||||
|
@ -75,7 +74,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
|
|||
async def stream_quotes(
|
||||
brokermod: ModuleType,
|
||||
get_quotes: Coroutine,
|
||||
tickers2chans: {str: Channel},
|
||||
tickers2chans: {str: tractor.Channel},
|
||||
rate: int = 5, # delay between quote requests
|
||||
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
||||
cid: str = None,
|
||||
|
@ -246,8 +245,10 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
|
|||
lambda ticker: ticker not in tickers, tickers2chans.copy()
|
||||
):
|
||||
chanset = tickers2chans.get(ticker)
|
||||
if chanset:
|
||||
chanset.discard((chan, cid))
|
||||
# XXX: cid will be different on unsub call
|
||||
for item in chanset.copy():
|
||||
if chan in item:
|
||||
chanset.discard(item)
|
||||
|
||||
if not chanset:
|
||||
# pop empty sets which will trigger bg quoter task termination
|
||||
|
@ -257,7 +258,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
|
|||
async def start_quote_stream(
|
||||
broker: str,
|
||||
tickers: [str],
|
||||
chan: 'Channel' = None,
|
||||
chan: tractor.Channel = None,
|
||||
cid: str = None,
|
||||
) -> None:
|
||||
"""Handle per-broker quote stream subscriptions.
|
||||
|
@ -266,8 +267,11 @@ async def start_quote_stream(
|
|||
Since most brokers seems to support batch quote requests we
|
||||
limit to one task per process for now.
|
||||
"""
|
||||
actor = tractor.current_actor()
|
||||
# set log level after fork
|
||||
get_console_log(actor.loglevel)
|
||||
# pull global vars from local actor
|
||||
ss = tractor.current_actor().statespace
|
||||
ss = actor.statespace
|
||||
broker2tickersubs = ss['broker2tickersubs']
|
||||
clients = ss['clients']
|
||||
dtasks = ss['dtasks']
|
||||
|
|
|
@ -8,11 +8,11 @@ import os
|
|||
import click
|
||||
import pandas as pd
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
from . import watchlists as wl
|
||||
from .brokers import core, get_brokermod
|
||||
from .log import get_console_log, colorize_json, get_logger
|
||||
from . import tractor
|
||||
|
||||
log = get_logger('cli')
|
||||
DEFAULT_BROKER = 'robinhood'
|
||||
|
@ -38,6 +38,7 @@ def pikerd(loglevel, host):
|
|||
outlive_main=True, # run daemon forever
|
||||
rpc_module_paths=['piker.brokers.core'],
|
||||
name='brokerd',
|
||||
loglevel=loglevel,
|
||||
)
|
||||
|
||||
|
||||
|
|
191
piker/ipc.py
191
piker/ipc.py
|
@ -1,191 +0,0 @@
|
|||
"""
|
||||
Inter-process comms abstractions
|
||||
"""
|
||||
from typing import Coroutine, Tuple
|
||||
|
||||
import msgpack
|
||||
import trio
|
||||
|
||||
from .log import get_logger
|
||||
log = get_logger('ipc')
|
||||
|
||||
|
||||
class StreamQueue:
|
||||
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
|
||||
"""
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
self._agen = self._iter_packets()
|
||||
self._laddr = self.stream.socket.getsockname()[:2]
|
||||
self._raddr = self.stream.socket.getpeername()[:2]
|
||||
self._send_lock = trio.Lock()
|
||||
|
||||
async def _iter_packets(self):
|
||||
"""Yield packets from the underlying stream.
|
||||
"""
|
||||
unpacker = msgpack.Unpacker(raw=False, use_list=False)
|
||||
while True:
|
||||
try:
|
||||
data = await self.stream.receive_some(2**10)
|
||||
log.trace(f"received {data}")
|
||||
except trio.BrokenStreamError:
|
||||
log.error(f"Stream connection {self.raddr} broke")
|
||||
return
|
||||
|
||||
if data == b'':
|
||||
log.debug(f"Stream connection {self.raddr} was closed")
|
||||
return
|
||||
|
||||
unpacker.feed(data)
|
||||
for packet in unpacker:
|
||||
yield packet
|
||||
|
||||
@property
|
||||
def laddr(self):
|
||||
return self._laddr
|
||||
|
||||
@property
|
||||
def raddr(self):
|
||||
return self._raddr
|
||||
|
||||
async def put(self, data):
|
||||
async with self._send_lock:
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
|
||||
async def get(self):
|
||||
return await self._agen.asend(None)
|
||||
|
||||
async def __aiter__(self):
|
||||
return self._agen
|
||||
|
||||
def connected(self):
|
||||
return self.stream.socket.fileno() != -1
|
||||
|
||||
|
||||
class Channel:
|
||||
"""A channel to actors in other processes.
|
||||
|
||||
Use this to talk to any micro-service daemon or other client(s) over a
|
||||
a transport managed by ``trio``.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
destaddr: tuple = None,
|
||||
on_reconnect: Coroutine = None,
|
||||
auto_reconnect: bool = False,
|
||||
stream: trio.SocketStream = None, # expected to be active
|
||||
) -> None:
|
||||
self._recon_seq = on_reconnect
|
||||
self._autorecon = auto_reconnect
|
||||
self.squeue = StreamQueue(stream) if stream else None
|
||||
if self.squeue and destaddr:
|
||||
raise ValueError(
|
||||
f"A stream was provided with local addr {self.laddr}"
|
||||
)
|
||||
self._destaddr = destaddr or self.squeue.raddr
|
||||
# set after handshake - always uid of far end
|
||||
self.uid = None
|
||||
|
||||
def __repr__(self):
|
||||
if self.squeue:
|
||||
return repr(
|
||||
self.squeue.stream.socket._sock).replace(
|
||||
"socket.socket", "Channel")
|
||||
return object.__repr__(self)
|
||||
|
||||
@property
|
||||
def laddr(self):
|
||||
return self.squeue.laddr if self.squeue else (None, None)
|
||||
|
||||
@property
|
||||
def raddr(self):
|
||||
return self.squeue.raddr if self.squeue else (None, None)
|
||||
|
||||
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
|
||||
if self.connected():
|
||||
raise RuntimeError("channel is already connected?")
|
||||
destaddr = destaddr or self._destaddr
|
||||
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||
self.squeue = StreamQueue(stream)
|
||||
return stream
|
||||
|
||||
async def send(self, item):
|
||||
log.trace(f"send `{item}`")
|
||||
await self.squeue.put(item)
|
||||
|
||||
async def recv(self):
|
||||
try:
|
||||
return await self.squeue.get()
|
||||
except trio.BrokenStreamError:
|
||||
if self._autorecon:
|
||||
await self._reconnect()
|
||||
return await self.recv()
|
||||
|
||||
async def aclose(self, *args):
|
||||
log.debug(f"Closing {self}")
|
||||
await self.squeue.stream.aclose()
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
await self.aclose(*args)
|
||||
|
||||
async def __aiter__(self):
|
||||
return self.aiter_recv()
|
||||
|
||||
async def _reconnect(self):
|
||||
"""Handle connection failures by polling until a reconnect can be
|
||||
established.
|
||||
"""
|
||||
down = False
|
||||
while True:
|
||||
try:
|
||||
with trio.move_on_after(3) as cancel_scope:
|
||||
await self.connect()
|
||||
cancelled = cancel_scope.cancelled_caught
|
||||
if cancelled:
|
||||
log.warn(
|
||||
"Reconnect timed out after 3 seconds, retrying...")
|
||||
continue
|
||||
else:
|
||||
log.warn("Stream connection re-established!")
|
||||
# run any reconnection sequence
|
||||
on_recon = self._recon_seq
|
||||
if on_recon:
|
||||
await on_recon(self)
|
||||
break
|
||||
except (OSError, ConnectionRefusedError):
|
||||
if not down:
|
||||
down = True
|
||||
log.warn(
|
||||
f"Connection to {self.raddr} went down, waiting"
|
||||
" for re-establishment")
|
||||
await trio.sleep(1)
|
||||
|
||||
async def aiter_recv(self):
|
||||
"""Async iterate items from underlying stream.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
async for item in self.squeue:
|
||||
yield item
|
||||
# sent = yield item
|
||||
# if sent is not None:
|
||||
# # optimization, passing None through all the
|
||||
# # time is pointless
|
||||
# await self.squeue.put(sent)
|
||||
except trio.BrokenStreamError:
|
||||
if not self._autorecon:
|
||||
raise
|
||||
await self.aclose()
|
||||
if self._autorecon: # attempt reconnect
|
||||
await self._reconnect()
|
||||
continue
|
||||
else:
|
||||
return
|
||||
|
||||
def connected(self):
|
||||
return self.squeue.connected() if self.squeue else False
|
899
piker/tractor.py
899
piker/tractor.py
|
@ -1,899 +0,0 @@
|
|||
"""
|
||||
tracor: An actor model micro-framework.
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
from typing import Coroutine
|
||||
import importlib
|
||||
import inspect
|
||||
import multiprocessing as mp
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
from .ipc import Channel
|
||||
from .log import get_console_log, get_logger
|
||||
|
||||
ctx = mp.get_context("forkserver")
|
||||
log = get_logger('tractor')
|
||||
|
||||
# set at startup and after forks
|
||||
_current_actor = None
|
||||
_default_arbiter_host = '127.0.0.1'
|
||||
_default_arbiter_port = 1616
|
||||
|
||||
|
||||
class ActorFailure(Exception):
|
||||
"General actor failure"
|
||||
|
||||
|
||||
class RemoteActorError(ActorFailure):
|
||||
"Remote actor exception bundled locally"
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_nursery(nursery=None):
|
||||
"""Create a new nursery if None provided.
|
||||
|
||||
Blocks on exit as expected if no input nursery is provided.
|
||||
"""
|
||||
if nursery is not None:
|
||||
yield nursery
|
||||
else:
|
||||
async with trio.open_nursery() as nursery:
|
||||
yield nursery
|
||||
|
||||
|
||||
async def _invoke(
|
||||
cid, chan, func, kwargs,
|
||||
treat_as_gen=False, raise_errs=False,
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
"""Invoke local func and return results over provided channel.
|
||||
"""
|
||||
try:
|
||||
is_async_partial = False
|
||||
if isinstance(func, partial):
|
||||
is_async_partial = inspect.iscoroutinefunction(func.func)
|
||||
|
||||
if not inspect.iscoroutinefunction(func) and not is_async_partial:
|
||||
await chan.send({'return': func(**kwargs), 'cid': cid})
|
||||
else:
|
||||
coro = func(**kwargs)
|
||||
|
||||
if inspect.isasyncgen(coro):
|
||||
async for item in coro:
|
||||
# TODO: can we send values back in here?
|
||||
# it's gonna require a `while True:` and
|
||||
# some non-blocking way to retrieve new `asend()`
|
||||
# values from the channel:
|
||||
# to_send = await chan.recv_nowait()
|
||||
# if to_send is not None:
|
||||
# to_yield = await coro.asend(to_send)
|
||||
await chan.send({'yield': item, 'cid': cid})
|
||||
else:
|
||||
if treat_as_gen:
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
# back values like an async-generator would but must
|
||||
# manualy construct the response dict-packet-responses as
|
||||
# above
|
||||
await coro
|
||||
else:
|
||||
await chan.send({'return': await coro, 'cid': cid})
|
||||
|
||||
task_status.started()
|
||||
except Exception:
|
||||
if not raise_errs:
|
||||
await chan.send({'error': traceback.format_exc(), 'cid': cid})
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
async def result_from_q(q):
|
||||
"""Process a msg from a remote actor.
|
||||
"""
|
||||
first_msg = await q.get()
|
||||
if 'return' in first_msg:
|
||||
return 'return', first_msg, q
|
||||
elif 'yield' in first_msg:
|
||||
return 'yield', first_msg, q
|
||||
elif 'error' in first_msg:
|
||||
raise RemoteActorError(first_msg['error'])
|
||||
else:
|
||||
raise ValueError(f"{first_msg} is an invalid response packet?")
|
||||
|
||||
|
||||
async def _do_handshake(actor, chan):
|
||||
await chan.send(actor.uid)
|
||||
uid = await chan.recv()
|
||||
|
||||
if not isinstance(uid, tuple):
|
||||
raise ValueError(f"{uid} is not a valid uid?!")
|
||||
|
||||
chan.uid = uid
|
||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||
return uid
|
||||
|
||||
|
||||
class Actor:
|
||||
"""The fundamental concurrency primitive.
|
||||
|
||||
An *actor* is the combination of a regular Python or
|
||||
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
|
||||
with other actors through "portals" which provide a native async API
|
||||
around "channels".
|
||||
"""
|
||||
is_arbiter = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
main: Coroutine = None,
|
||||
rpc_module_paths: [str] = [],
|
||||
statespace: dict = {},
|
||||
uid: str = None,
|
||||
allow_rpc: bool = True,
|
||||
outlive_main: bool = False,
|
||||
):
|
||||
self.name = name
|
||||
self.uid = (name, uid or str(uuid.uuid1()))
|
||||
self.rpc_module_paths = rpc_module_paths
|
||||
self._mods = {}
|
||||
self.main = main
|
||||
# TODO: consider making this a dynamically defined
|
||||
# @dataclass once we get py3.7
|
||||
self.statespace = statespace
|
||||
self._allow_rpc = allow_rpc
|
||||
self._outlive_main = outlive_main
|
||||
|
||||
# filled in by `_async_main` after fork
|
||||
self._peers = defaultdict(list)
|
||||
self._no_more_peers = trio.Event()
|
||||
self._no_more_peers.set()
|
||||
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
|
||||
self._listeners = []
|
||||
self._parent_chan = None
|
||||
self._accept_host = None
|
||||
|
||||
async def wait_for_peer(self, uid):
|
||||
"""Wait for a connection back from a spawned actor with a given
|
||||
``uid``.
|
||||
"""
|
||||
log.debug(f"Waiting for peer {uid} to connect")
|
||||
event = self._peers.setdefault(uid, trio.Event())
|
||||
await event.wait()
|
||||
log.debug(f"{uid} successfully connected back to us")
|
||||
return event, self._peers[uid][-1]
|
||||
|
||||
def load_namespaces(self):
|
||||
# We load namespaces after fork since this actor may
|
||||
# be spawned on a different machine from the original nursery
|
||||
# and we need to try and load the local module code (if it
|
||||
# exists)
|
||||
for path in self.rpc_module_paths:
|
||||
self._mods[path] = importlib.import_module(path)
|
||||
|
||||
async def _stream_handler(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
):
|
||||
"""
|
||||
Entry point for new inbound connections to the channel server.
|
||||
"""
|
||||
self._no_more_peers.clear()
|
||||
chan = Channel(stream=stream)
|
||||
log.info(f"New connection to us {chan}")
|
||||
|
||||
# send/receive initial handshake response
|
||||
try:
|
||||
uid = await _do_handshake(self, chan)
|
||||
except StopAsyncIteration:
|
||||
log.warn(f"Channel {chan} failed to handshake")
|
||||
return
|
||||
|
||||
# channel tracking
|
||||
event_or_chans = self._peers.pop(uid, None)
|
||||
if isinstance(event_or_chans, trio.Event):
|
||||
# Instructing connection: this is likely a new channel to
|
||||
# a recently spawned actor which we'd like to control via
|
||||
# async-rpc calls.
|
||||
log.debug(f"Waking channel waiters {event_or_chans.statistics()}")
|
||||
# Alert any task waiting on this connection to come up
|
||||
event_or_chans.set()
|
||||
event_or_chans.clear() # consumer can wait on channel to close
|
||||
elif isinstance(event_or_chans, list):
|
||||
log.warn(
|
||||
f"already have channel(s) for {uid}:{event_or_chans}?"
|
||||
)
|
||||
# append new channel
|
||||
self._peers[uid].extend(event_or_chans)
|
||||
|
||||
log.debug(f"Registered {chan} for {uid}")
|
||||
self._peers[uid].append(chan)
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
try:
|
||||
await self._process_messages(chan)
|
||||
finally:
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
if chan is not self._parent_chan:
|
||||
log.debug(f"Releasing channel {chan}")
|
||||
chans = self._peers.get(chan.uid)
|
||||
chans.remove(chan)
|
||||
if not chans:
|
||||
log.debug(f"No more channels for {chan.uid}")
|
||||
self._peers.pop(chan.uid, None)
|
||||
if not self._peers: # no more channels connected
|
||||
self._no_more_peers.set()
|
||||
log.debug(f"No more peer channels")
|
||||
|
||||
def _push_result(self, actorid, cid, msg):
|
||||
assert actorid, f"`actorid` can't be {actorid}"
|
||||
q = self.get_waitq(actorid, cid)
|
||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
||||
waiters = q.statistics().tasks_waiting_get
|
||||
if not waiters:
|
||||
log.warn(
|
||||
f"No tasks are currently waiting for results from call {cid}?")
|
||||
q.put_nowait(msg)
|
||||
|
||||
def get_waitq(self, actorid, cid):
|
||||
log.debug(f"Registering for callid {cid} queue results from {actorid}")
|
||||
cids2qs = self._actors2calls.setdefault(actorid, {})
|
||||
return cids2qs.setdefault(cid, trio.Queue(1000))
|
||||
|
||||
async def send_cmd(self, chan, ns, func, kwargs):
|
||||
"""Send a ``'cmd'`` message to a remote actor and return a
|
||||
caller id and a ``trio.Queue`` that can be used to wait for
|
||||
responses delivered by the local message processing loop.
|
||||
"""
|
||||
cid = str(uuid.uuid1())
|
||||
q = self.get_waitq(chan.uid, cid)
|
||||
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||
return cid, q
|
||||
|
||||
async def _process_messages(self, chan, treat_as_gen=False):
|
||||
"""Process messages async-RPC style.
|
||||
|
||||
Process rpc requests and deliver retrieved responses from channels.
|
||||
"""
|
||||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||
# worked out we'll likely want to use that!
|
||||
log.debug(f"Entering msg loop for {chan}")
|
||||
async with trio.open_nursery() as nursery:
|
||||
try:
|
||||
async for msg in chan.aiter_recv():
|
||||
if msg is None: # terminate sentinel
|
||||
log.debug(f"Cancelling all tasks for {chan}")
|
||||
nursery.cancel_scope.cancel()
|
||||
log.debug(f"Terminating msg loop for {chan}")
|
||||
break
|
||||
log.debug(f"Received msg {msg}")
|
||||
cid = msg.get('cid')
|
||||
if cid: # deliver response to local caller/waiter
|
||||
self._push_result(chan.uid, cid, msg)
|
||||
if 'error' in msg:
|
||||
# TODO: need something better then this slop
|
||||
raise RemoteActorError(msg['error'])
|
||||
log.debug(f"Waiting on next msg for {chan}")
|
||||
continue
|
||||
else:
|
||||
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||
|
||||
log.debug(
|
||||
f"Processing request from {actorid}\n"
|
||||
f"{ns}.{funcname}({kwargs})")
|
||||
if ns == 'self':
|
||||
func = getattr(self, funcname)
|
||||
else:
|
||||
func = getattr(self._mods[ns], funcname)
|
||||
|
||||
# spin up a task for the requested function
|
||||
sig = inspect.signature(func)
|
||||
treat_as_gen = False
|
||||
if 'chan' in sig.parameters:
|
||||
assert 'cid' in sig.parameters, \
|
||||
f"{func} must accept a `cid` (caller id) kwarg"
|
||||
kwargs['chan'] = chan
|
||||
kwargs['cid'] = cid
|
||||
# TODO: eventually we want to be more stringent
|
||||
# about what is considered a far-end async-generator.
|
||||
# Right now both actual async gens and any async
|
||||
# function which declares a `chan` kwarg in its
|
||||
# signature will be treated as one.
|
||||
treat_as_gen = True
|
||||
|
||||
log.debug(f"Spawning task for {func}")
|
||||
nursery.start_soon(
|
||||
_invoke, cid, chan, func, kwargs, treat_as_gen,
|
||||
name=funcname
|
||||
)
|
||||
log.debug(f"Waiting on next msg for {chan}")
|
||||
else: # channel disconnect
|
||||
log.debug(f"{chan} disconnected")
|
||||
except trio.ClosedStreamError:
|
||||
log.error(f"{chan} broke")
|
||||
|
||||
log.debug(f"Exiting msg loop for {chan}")
|
||||
|
||||
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
|
||||
# after fork routine which invokes a fresh ``trio.run``
|
||||
log.info(
|
||||
f"Started new {ctx.current_process()} for actor {self.uid}")
|
||||
global _current_actor
|
||||
_current_actor = self
|
||||
if loglevel:
|
||||
get_console_log(loglevel)
|
||||
log.debug(f"parent_addr is {parent_addr}")
|
||||
try:
|
||||
trio.run(partial(
|
||||
self._async_main, accept_addr, parent_addr=parent_addr))
|
||||
except KeyboardInterrupt:
|
||||
pass # handle it the same way trio does?
|
||||
log.debug(f"Actor {self.uid} terminated")
|
||||
|
||||
async def _async_main(
|
||||
self,
|
||||
accept_addr,
|
||||
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
|
||||
parent_addr=None,
|
||||
nursery=None
|
||||
):
|
||||
"""Start the channel server and main task.
|
||||
|
||||
A "root-most" (or "top-level") nursery for this actor is opened here
|
||||
and when cancelled effectively cancels the actor.
|
||||
"""
|
||||
result = None
|
||||
try:
|
||||
async with maybe_open_nursery(nursery) as nursery:
|
||||
self._root_nursery = nursery
|
||||
|
||||
# Startup up channel server
|
||||
host, port = accept_addr
|
||||
await nursery.start(partial(
|
||||
self._serve_forever, accept_host=host, accept_port=port)
|
||||
)
|
||||
|
||||
if parent_addr is not None:
|
||||
# Connect back to the parent actor and conduct initial
|
||||
# handshake (From this point on if we error ship the
|
||||
# exception back to the parent actor)
|
||||
chan = self._parent_chan = Channel(
|
||||
destaddr=parent_addr,
|
||||
on_reconnect=self.main
|
||||
)
|
||||
await chan.connect()
|
||||
# initial handshake, report who we are, who they are
|
||||
await _do_handshake(self, chan)
|
||||
|
||||
# handle new connection back to parent optionally
|
||||
# begin responding to RPC
|
||||
if self._allow_rpc:
|
||||
self.load_namespaces()
|
||||
if self._parent_chan:
|
||||
nursery.start_soon(
|
||||
self._process_messages, self._parent_chan)
|
||||
|
||||
# register with the arbiter if we're told its addr
|
||||
log.debug(f"Registering {self} for role `{self.name}`")
|
||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
||||
await arb_portal.run(
|
||||
'self', 'register_actor',
|
||||
name=self.name, sockaddr=self.accept_addr)
|
||||
|
||||
if self.main:
|
||||
if self._parent_chan:
|
||||
log.debug(f"Starting main task `{self.main}`")
|
||||
# start "main" routine in a task
|
||||
await nursery.start(
|
||||
_invoke, 'main', self._parent_chan, self.main, {},
|
||||
False, True # treat_as_gen, raise_errs params
|
||||
)
|
||||
else:
|
||||
# run directly
|
||||
log.debug(f"Running `{self.main}` directly")
|
||||
result = await self.main()
|
||||
|
||||
# terminate local in-proc once its main completes
|
||||
log.debug(
|
||||
f"Waiting for remaining peers {self._peers} to clear")
|
||||
await self._no_more_peers.wait()
|
||||
log.debug(f"All peer channels are complete")
|
||||
|
||||
# tear down channel server
|
||||
if not self._outlive_main:
|
||||
log.debug(f"Shutting down channel server")
|
||||
self.cancel_server()
|
||||
|
||||
# blocks here as expected if no nursery was provided until
|
||||
# the channel server is killed (i.e. this actor is
|
||||
# cancelled or signalled by the parent actor)
|
||||
except Exception:
|
||||
if self._parent_chan:
|
||||
log.exception("Actor errored:")
|
||||
await self._parent_chan.send(
|
||||
{'error': traceback.format_exc(), 'cid': 'main'})
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
# UNregister actor from the arbiter
|
||||
try:
|
||||
if arbiter_addr is not None:
|
||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
||||
await arb_portal.run(
|
||||
'self', 'register_actor',
|
||||
name=self.name, sockaddr=self.accept_addr)
|
||||
except OSError:
|
||||
log.warn(f"Unable to unregister {self.name} from arbiter")
|
||||
|
||||
return result
|
||||
|
||||
async def _serve_forever(
|
||||
self,
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
accept_host=None,
|
||||
accept_port=0,
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
"""Main coroutine: connect back to the parent, spawn main task, begin
|
||||
listening for new messages.
|
||||
|
||||
"""
|
||||
async with trio.open_nursery() as nursery:
|
||||
self._server_nursery = nursery
|
||||
# TODO: might want to consider having a separate nursery
|
||||
# for the stream handler such that the server can be cancelled
|
||||
# whilst leaving existing channels up
|
||||
listeners = await nursery.start(
|
||||
partial(
|
||||
trio.serve_tcp,
|
||||
self._stream_handler,
|
||||
handler_nursery=self._root_nursery,
|
||||
port=accept_port, host=accept_host,
|
||||
)
|
||||
)
|
||||
log.debug(
|
||||
f"Started tcp server(s) on {[l.socket for l in listeners]}")
|
||||
self._listeners.extend(listeners)
|
||||
task_status.started()
|
||||
|
||||
def cancel(self):
|
||||
"""This cancels the internal root-most nursery thereby gracefully
|
||||
cancelling (for all intents and purposes) this actor.
|
||||
"""
|
||||
self._root_nursery.cancel_scope.cancel()
|
||||
|
||||
def cancel_server(self):
|
||||
"""Cancel the internal channel server nursery thereby
|
||||
preventing any new inbound connections from being established.
|
||||
"""
|
||||
self._server_nursery.cancel_scope.cancel()
|
||||
|
||||
@property
|
||||
def accept_addr(self):
|
||||
"""Primary address to which the channel server is bound.
|
||||
"""
|
||||
try:
|
||||
return self._listeners[0].socket.getsockname()
|
||||
except OSError:
|
||||
return
|
||||
|
||||
def get_parent(self):
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
def get_chans(self, actorid):
|
||||
return self._peers[actorid]
|
||||
|
||||
|
||||
class Arbiter(Actor):
|
||||
"""A special actor who knows all the other actors and always has
|
||||
access to the top level nursery.
|
||||
|
||||
The arbiter is by default the first actor spawned on each host
|
||||
and is responsible for keeping track of all other actors for
|
||||
coordination purposes. If a new main process is launched and an
|
||||
arbiter is already running that arbiter will be used.
|
||||
"""
|
||||
_registry = defaultdict(list)
|
||||
is_arbiter = True
|
||||
|
||||
def find_actor(self, name):
|
||||
return self._registry[name]
|
||||
|
||||
def register_actor(self, name, sockaddr):
|
||||
self._registry[name].append(sockaddr)
|
||||
|
||||
def unregister_actor(self, name, sockaddr):
|
||||
sockaddrs = self._registry.get(name)
|
||||
if sockaddrs:
|
||||
try:
|
||||
sockaddrs.remove(sockaddr)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
class Portal:
|
||||
"""A 'portal' to a(n) (remote) ``Actor``.
|
||||
|
||||
Allows for invoking remote routines and receiving results through an
|
||||
underlying ``tractor.Channel`` as though the remote (async)
|
||||
function / generator was invoked locally.
|
||||
|
||||
Think of this like an native async IPC API.
|
||||
"""
|
||||
def __init__(self, channel):
|
||||
self.channel = channel
|
||||
|
||||
async def aclose(self):
|
||||
log.debug(f"Closing {self}")
|
||||
# XXX: won't work until https://github.com/python-trio/trio/pull/460
|
||||
# gets in!
|
||||
await self.channel.aclose()
|
||||
|
||||
async def run(self, ns, func, **kwargs):
|
||||
"""Submit a function to be scheduled and run by actor, return its
|
||||
(stream of) result(s).
|
||||
"""
|
||||
# TODO: not this needs some serious work and thinking about how
|
||||
# to make async-generators the fundamental IPC API over channels!
|
||||
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
||||
chan = self.channel
|
||||
# ship a function call request to the remote actor
|
||||
actor = current_actor()
|
||||
|
||||
cid, q = await actor.send_cmd(chan, ns, func, kwargs)
|
||||
# wait on first response msg
|
||||
resptype, first_msg, q = await result_from_q(q)
|
||||
|
||||
if resptype == 'yield':
|
||||
|
||||
async def yield_from_q():
|
||||
yield first_msg['yield']
|
||||
try:
|
||||
async for msg in q:
|
||||
try:
|
||||
yield msg['yield']
|
||||
except KeyError:
|
||||
raise RemoteActorError(msg['error'])
|
||||
except GeneratorExit:
|
||||
log.debug(f"Cancelling async gen call {cid} to {chan.uid}")
|
||||
|
||||
return yield_from_q()
|
||||
|
||||
elif resptype == 'return':
|
||||
return first_msg['return']
|
||||
else:
|
||||
raise ValueError(f"Unknown msg response type: {first_msg}")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_portal(channel, nursery=None):
|
||||
"""Open a ``Portal`` through the provided ``channel``.
|
||||
|
||||
Spawns a background task to handle rpc message processing.
|
||||
"""
|
||||
actor = current_actor()
|
||||
assert actor
|
||||
was_connected = False
|
||||
|
||||
async with maybe_open_nursery(nursery) as nursery:
|
||||
|
||||
if not channel.connected():
|
||||
await channel.connect()
|
||||
was_connected = True
|
||||
|
||||
if channel.uid is None:
|
||||
await _do_handshake(actor, channel)
|
||||
|
||||
if not actor.get_chans(channel.uid):
|
||||
# actor is not currently managing this channel
|
||||
actor._peers[channel.uid].append(channel)
|
||||
|
||||
nursery.start_soon(actor._process_messages, channel)
|
||||
yield Portal(channel)
|
||||
|
||||
# cancel background msg loop task
|
||||
nursery.cancel_scope.cancel()
|
||||
if was_connected:
|
||||
actor._peers[channel.uid].remove(channel)
|
||||
await channel.aclose()
|
||||
|
||||
|
||||
class LocalPortal:
|
||||
"""A 'portal' to a local ``Actor``.
|
||||
|
||||
A compatibility shim for normal portals but for invoking functions
|
||||
using an in process actor instance.
|
||||
"""
|
||||
def __init__(self, actor):
|
||||
self.actor = actor
|
||||
|
||||
async def run(self, ns, func, **kwargs):
|
||||
"""Run a requested function locally and return it's result.
|
||||
"""
|
||||
obj = self.actor if ns == 'self' else importlib.import_module(ns)
|
||||
func = getattr(obj, func)
|
||||
return func(**kwargs)
|
||||
|
||||
|
||||
class ActorNursery:
|
||||
"""Spawn scoped subprocess actors.
|
||||
"""
|
||||
def __init__(self, actor, supervisor=None):
|
||||
self.supervisor = supervisor
|
||||
self._actor = actor
|
||||
# We'll likely want some way to cancel all sub-actors eventually
|
||||
# self.cancel_scope = cancel_scope
|
||||
self._children = {}
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
main=None,
|
||||
bind_addr=('127.0.0.1', 0),
|
||||
statespace=None,
|
||||
rpc_module_paths=None,
|
||||
outlive_main=False, # sub-actors die when their main task completes
|
||||
loglevel=None, # set console logging per subactor
|
||||
):
|
||||
actor = Actor(
|
||||
name,
|
||||
# modules allowed to invoked funcs from
|
||||
rpc_module_paths=rpc_module_paths,
|
||||
statespace=statespace, # global proc state vars
|
||||
main=main, # main coroutine to be invoked
|
||||
outlive_main=outlive_main,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
assert parent_addr
|
||||
proc = ctx.Process(
|
||||
target=actor._fork_main,
|
||||
args=(bind_addr, parent_addr, loglevel),
|
||||
daemon=True,
|
||||
name=name,
|
||||
)
|
||||
proc.start()
|
||||
if not proc.is_alive():
|
||||
raise ActorFailure("Couldn't start sub-actor?")
|
||||
|
||||
# wait for actor to spawn and connect back to us
|
||||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await self._actor.wait_for_peer(actor.uid)
|
||||
# channel is up, get queue which delivers result from main routine
|
||||
main_q = self._actor.get_waitq(actor.uid, 'main')
|
||||
self._children[(name, proc.pid)] = (actor, proc, main_q)
|
||||
|
||||
return Portal(chan)
|
||||
|
||||
async def wait(self):
|
||||
|
||||
async def wait_for_proc(proc):
|
||||
# TODO: timeout block here?
|
||||
if proc.is_alive():
|
||||
await trio.hazmat.wait_readable(proc.sentinel)
|
||||
# please god don't hang
|
||||
proc.join()
|
||||
log.debug(f"Joined {proc}")
|
||||
|
||||
# unblocks when all waiter tasks have completed
|
||||
async with trio.open_nursery() as nursery:
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
nursery.start_soon(wait_for_proc, proc)
|
||||
|
||||
async def cancel(self, hard_kill=False):
|
||||
log.debug(f"Cancelling nursery")
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
if proc is mp.current_process():
|
||||
# XXX: does this even make sense?
|
||||
await subactor.cancel()
|
||||
else:
|
||||
if hard_kill:
|
||||
log.warn(f"Hard killing subactors {self._children}")
|
||||
proc.terminate()
|
||||
# send KeyBoardInterrupt (trio abort signal) to underlying
|
||||
# sub-actors
|
||||
# os.kill(proc.pid, signal.SIGINT)
|
||||
else:
|
||||
# send cancel cmd - likely no response from subactor
|
||||
actor = self._actor
|
||||
chans = actor.get_chans(subactor.uid)
|
||||
if chans:
|
||||
for chan in chans:
|
||||
await actor.send_cmd(chan, 'self', 'cancel', {})
|
||||
else:
|
||||
log.warn(
|
||||
f"Channel for {subactor.uid} is already down?")
|
||||
log.debug(f"Waiting on all subactors to complete")
|
||||
await self.wait()
|
||||
log.debug(f"All subactors for {self} have terminated")
|
||||
|
||||
async def __aexit__(self, etype, value, tb):
|
||||
"""Wait on all subactor's main routines to complete.
|
||||
"""
|
||||
async def wait_for_actor(actor, proc, q):
|
||||
if proc.is_alive():
|
||||
ret_type, msg, q = await result_from_q(q)
|
||||
log.info(f"{actor.uid} main task completed with {msg}")
|
||||
if not actor._outlive_main:
|
||||
# trigger msg loop to break
|
||||
chans = self._actor.get_chans(actor.uid)
|
||||
for chan in chans:
|
||||
log.info(f"Signalling msg loop exit for {actor.uid}")
|
||||
await chan.send(None)
|
||||
|
||||
if etype is not None:
|
||||
log.warn(f"{current_actor().uid} errored with {etype}, "
|
||||
"cancelling actor nursery")
|
||||
await self.cancel()
|
||||
else:
|
||||
log.debug(f"Waiting on subactors to complete")
|
||||
async with trio.open_nursery() as nursery:
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
|
||||
|
||||
await self.wait()
|
||||
log.debug(f"Nursery teardown complete")
|
||||
|
||||
|
||||
def current_actor() -> Actor:
|
||||
"""Get the process-local actor instance.
|
||||
"""
|
||||
return _current_actor
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_nursery(supervisor=None, loglevel='WARNING'):
|
||||
"""Create and yield a new ``ActorNursery``.
|
||||
"""
|
||||
actor = current_actor()
|
||||
if not actor:
|
||||
raise RuntimeError("No actor instance has been defined yet?")
|
||||
|
||||
# TODO: figure out supervisors from erlang
|
||||
async with ActorNursery(current_actor(), supervisor) as nursery:
|
||||
yield nursery
|
||||
|
||||
|
||||
class NoArbiterFound(Exception):
|
||||
"Couldn't find the arbiter?"
|
||||
|
||||
|
||||
async def start_actor(actor, host, port, arbiter_addr, nursery=None):
|
||||
"""Spawn a local actor by starting a task to execute it's main
|
||||
async function.
|
||||
|
||||
Blocks if no nursery is provided, in which case it is expected the nursery
|
||||
provider is responsible for waiting on the task to complete.
|
||||
"""
|
||||
# assign process-local actor
|
||||
global _current_actor
|
||||
_current_actor = actor
|
||||
|
||||
# start local channel-server and fake the portal API
|
||||
# NOTE: this won't block since we provide the nursery
|
||||
log.info(f"Starting local {actor} @ {host}:{port}")
|
||||
|
||||
await actor._async_main(
|
||||
accept_addr=(host, port),
|
||||
parent_addr=None,
|
||||
arbiter_addr=arbiter_addr,
|
||||
nursery=nursery,
|
||||
)
|
||||
# XXX: If spawned locally, the actor is cancelled when this
|
||||
# context is complete given that there are no more active
|
||||
# peer channels connected to it.
|
||||
if not actor._outlive_main:
|
||||
actor.cancel_server()
|
||||
|
||||
# unset module state
|
||||
_current_actor = None
|
||||
log.info("Completed async main")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _connect_chan(host, port):
|
||||
"""Attempt to connect to an arbiter's channel server.
|
||||
Return the channel on success or None on failure.
|
||||
"""
|
||||
chan = Channel((host, port))
|
||||
await chan.connect()
|
||||
yield chan
|
||||
await chan.aclose()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_arbiter(host, port):
|
||||
"""Return a portal instance connected to a local or remote
|
||||
arbiter.
|
||||
"""
|
||||
actor = current_actor()
|
||||
if not actor:
|
||||
raise RuntimeError("No actor instance has been defined yet?")
|
||||
|
||||
if actor.is_arbiter:
|
||||
# we're already the arbiter
|
||||
# (likely a re-entrant call from the arbiter actor)
|
||||
yield LocalPortal(actor)
|
||||
else:
|
||||
async with _connect_chan(host, port) as chan:
|
||||
async with open_portal(chan) as arb_portal:
|
||||
yield arb_portal
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def find_actor(
|
||||
name,
|
||||
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port)
|
||||
):
|
||||
"""Ask the arbiter to find actor(s) by name.
|
||||
|
||||
Returns a sequence of unconnected portals for each matching actor
|
||||
known to the arbiter (client code is expected to connect the portals).
|
||||
"""
|
||||
actor = current_actor()
|
||||
if not actor:
|
||||
raise RuntimeError("No actor instance has been defined yet?")
|
||||
|
||||
async with get_arbiter(*arbiter_sockaddr) as arb_portal:
|
||||
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
|
||||
# TODO: return portals to all available actors - for now just
|
||||
# the first one we find
|
||||
if sockaddrs:
|
||||
sockaddr = sockaddrs[-1]
|
||||
async with _connect_chan(*sockaddr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
yield
|
||||
|
||||
|
||||
async def _main(async_fn, args, kwargs, name, arbiter_addr):
|
||||
"""Async entry point for ``tractor``.
|
||||
"""
|
||||
main = partial(async_fn, *args) if async_fn else None
|
||||
arbiter_addr = (host, port) = arbiter_addr or (
|
||||
_default_arbiter_host, _default_arbiter_port)
|
||||
# make a temporary connection to see if an arbiter exists
|
||||
arbiter_found = False
|
||||
try:
|
||||
async with _connect_chan(host, port):
|
||||
arbiter_found = True
|
||||
except OSError:
|
||||
log.warn(f"No actor could be found @ {host}:{port}")
|
||||
|
||||
if arbiter_found: # we were able to connect to an arbiter
|
||||
log.info(f"Arbiter seems to exist @ {host}:{port}")
|
||||
# create a local actor and start up its main routine/task
|
||||
actor = Actor(
|
||||
name or 'anonymous',
|
||||
main=main,
|
||||
**kwargs
|
||||
)
|
||||
host, port = (_default_arbiter_host, 0)
|
||||
else:
|
||||
# start this local actor as the arbiter
|
||||
actor = Arbiter(name or 'arbiter', main=main, **kwargs)
|
||||
|
||||
await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
|
||||
# Creates an internal nursery which shouldn't be cancelled even if
|
||||
# the one opened below is (this is desirable because the arbiter should
|
||||
# stay up until a re-election process has taken place - which is not
|
||||
# implemented yet FYI).
|
||||
|
||||
|
||||
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):
|
||||
"""Run a trio-actor async function in process.
|
||||
|
||||
This is tractor's main entry and the start point for any async actor.
|
||||
"""
|
||||
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
|
|
@ -371,7 +371,7 @@ async def update_quotes(
|
|||
grid.render_rows(cache)
|
||||
log.debug("Waiting on quotes")
|
||||
|
||||
log.warn("Server connection dropped")
|
||||
log.warn("`brokerd` connection dropped")
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
|
||||
|
@ -391,7 +391,6 @@ async def _async_main(name, portal, tickers, brokermod, rate):
|
|||
"piker.brokers.core", 'start_quote_stream',
|
||||
broker=brokermod.name, tickers=tickers)
|
||||
|
||||
try:
|
||||
async with trio.open_nursery() as nursery:
|
||||
# get first quotes response
|
||||
log.debug("Waiting on first quote...")
|
||||
|
@ -457,11 +456,14 @@ async def _async_main(name, portal, tickers, brokermod, rate):
|
|||
nursery.start_soon(
|
||||
update_quotes, nursery, brokermod, widgets, agen, sd, quotes)
|
||||
|
||||
try:
|
||||
# Trio-kivy entry point.
|
||||
await async_runTouchApp(widgets['root']) # run kivy
|
||||
await agen.aclose() # cancel aysnc gen call
|
||||
finally:
|
||||
# un-subscribe from symbols stream
|
||||
# un-subscribe from symbols stream (cancel if brokerd
|
||||
# was already torn down - say by SIGINT)
|
||||
with trio.move_on_after(0.2):
|
||||
await portal.run(
|
||||
"piker.brokers.core", 'modify_quote_stream',
|
||||
broker=brokermod.name, tickers=[])
|
||||
|
|
|
@ -1,12 +1,8 @@
|
|||
"""
|
||||
Actor model API testing
|
||||
"""
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
from piker import tractor
|
||||
import tractor
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -14,78 +10,6 @@ def us_symbols():
|
|||
return ['TSLA', 'AAPL', 'CGC', 'CRON']
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_no_arbitter():
|
||||
"""An arbitter must be established before any nurseries
|
||||
can be created.
|
||||
|
||||
(In other words ``tractor.run`` must be used instead of ``trio.run`` as is
|
||||
done by the ``pytest-trio`` plugin.)
|
||||
"""
|
||||
with pytest.raises(RuntimeError):
|
||||
with tractor.open_nursery():
|
||||
pass
|
||||
|
||||
|
||||
def test_local_actor_async_func():
|
||||
"""Verify a simple async function in-process.
|
||||
"""
|
||||
nums = []
|
||||
|
||||
async def print_loop():
|
||||
# arbiter is started in-proc if dne
|
||||
assert tractor.current_actor().is_arbiter
|
||||
|
||||
for i in range(10):
|
||||
nums.append(i)
|
||||
await trio.sleep(0.1)
|
||||
|
||||
start = time.time()
|
||||
tractor.run(print_loop)
|
||||
|
||||
# ensure the sleeps were actually awaited
|
||||
assert time.time() - start >= 1
|
||||
assert nums == list(range(10))
|
||||
|
||||
|
||||
# NOTE: this func must be defined at module level in order for the
|
||||
# interal pickling infra of the forkserver to work
|
||||
async def spawn(is_arbiter):
|
||||
statespace = {'doggy': 10, 'kitty': 4}
|
||||
namespaces = ['piker.brokers.core']
|
||||
|
||||
await trio.sleep(0.1)
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter == is_arbiter
|
||||
|
||||
# arbiter should always have an empty statespace as it's redundant
|
||||
assert actor.statespace == statespace
|
||||
|
||||
if actor.is_arbiter:
|
||||
async with tractor.open_nursery() as nursery:
|
||||
# forks here
|
||||
portal = await nursery.start_actor(
|
||||
'sub-actor',
|
||||
main=partial(spawn, False),
|
||||
statespace=statespace,
|
||||
rpc_module_paths=namespaces,
|
||||
)
|
||||
assert len(nursery._children) == 1
|
||||
assert portal.channel.uid in tractor.current_actor()._peers
|
||||
else:
|
||||
return 10
|
||||
|
||||
|
||||
def test_local_arbiter_subactor_global_state():
|
||||
statespace = {'doggy': 10, 'kitty': 4}
|
||||
tractor.run(
|
||||
spawn,
|
||||
True,
|
||||
name='arbiter',
|
||||
statespace=statespace,
|
||||
)
|
||||
|
||||
|
||||
async def rx_price_quotes_from_brokerd(us_symbols):
|
||||
"""Verify we can spawn a daemon actor and retrieve streamed price data.
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue